Instantiation Pipeline β Architecture & Implementation Plan¶
Status: β Complete β All 7 phases delivered (2026-03-02) Created: 2026-03-01 Author: AI Architect (lcm-senior-architect) ADR: ADR-031 (Checkpoint-Based Instantiation Pipeline) Depends On: ADR-020 (Session Consolidation), ADR-029 (Port Template), ADR-030 (Resource Observation), AD-DRAIN-001 (Watch Drain-Loop Fix) Scope: Cross-service refactor of LabletSession instantiation lifecycle (lablet-controller, control-plane-api, resource-scheduler, worker-controller, lcm-core)
Executive Summary¶
The current _handle_instantiating() in lablet_reconciler.py is a monolithic waterfall β a single async method that sequentially performs lab resolution, lab import, lab start, LDS provisioning, device mapping, and READY transition. If any step fails, the entire instantiation restarts from zero. There is no checkpoint tracking, no partial recovery, and no visibility into progress.
This document designs a checkpoint-based instantiation pipeline β a dependency-ordered sequence of idempotent, resumable steps where progress is persisted to the LabletSession aggregate after each step. Combined with the drain-loop fix (AD-DRAIN-001), this ensures that self-induced watch events from each step's API call are processed in the same reconciliation cycle, enabling reliable multi-step state advancement.
Key Design Principles¶
- Step dependencies form a DAG, not a flat list β
lab_bindingrequirestags_sync, which requiresports_alloc, which requireslab_resolve. Steps declare their prerequisites explicitly. - Ports are a LabRecord topology concern β ports are allocated to a LabRecord when it is created/imported, and stored as
allocated_portson the LabRecord aggregate. They are part of the topology graph (nodes, edges, tags) and persist across start/stop/wipe. LabRunRecord tracks runtime (start/stop/duration) but does NOT own ports. The LabletSession gets a denormalized copy via binding. - CML node tags must be synced with allocated ports β after allocating real port numbers, the
tags_syncstep writes them back to CML node tags viaPATCH /api/v0/labs/{lab_id}/nodes/{node_id}. Tags encodeprotocol:portpairs and are consumed by CML for port-forwarding when the lab starts. Stop/wipe do NOT modify tags. - Timeslot-centric lifecycle β a LabletSession maps 1:1 to a timeslot. The timeslot can be extended/started/stopped/cancelled. Expired session sync triggers downstream cleanup (UserSession terminated, LabRecord unbound, reports published).
- Option A (inline) for now β pipeline steps are inline methods in the reconciler. Future: configurable per LabletDefinition.
User Requirements Addressed¶
| # | Requirement | Section |
|---|---|---|
| 1 | Port allocation at LabRecord creation (not scheduling), scheduler only considers availability | Β§3 |
| 2 | LabRecord as port owner; CML node tags synced with allocated ports | Β§3, Β§4 |
| 3 | Lab binding after lab resolution and tag sync; LabRunRecord tracks runtime only | Β§4 |
| 4 | Variables as placeholder step | Β§5 |
| 5 | Content sync as fail-fast prerequisite | Β§6 |
| 6 | Timeslot-centric lifecycle with downstream cleanup | Β§7 |
| 7 | Cross-service refactor (all 4 services) | Β§9 |
| 8 | Fix drain-loop first, then design pipeline | Β§1, Β§2 |
Table of Contents¶
- Prerequisites & Low-Hanging Fruit
- Pipeline Architecture
- Port Allocation β LabRecord-Level Concern
- Lab Binding & LabRunRecord
- Variables Step
- Content Sync Prerequisite
- Timeslot Lifecycle & Downstream Cleanup
- Pipeline UI (Phase 5)
- Cross-Service Implementation Plan
- Migration & Backward Compatibility
1. Prerequisites & Low-Hanging Fruit¶
1.1 Drain-Loop Fix (AD-DRAIN-001) β DONE¶
Problem: In watch-only mode (polling disabled), when _handle_instantiating() makes an API call that changes session state (e.g., SCHEDULED β INSTANTIATING), the resulting etcd watch event arrives during execution. The old _debounced_reconcile() checked if not self._pending_reconciles: return β discarding the self-induced event. The session would be stuck until the next polling cycle (disabled in production).
Fix: Replaced single-pass drain with while self._pending_reconciles: drain loop in WatchTriggeredHostedService._debounced_reconcile(). Self-induced events are now re-drained and processed within the same debounce cycle.
Status: β Implemented, 10 tests passing, all 103 core tests green.
1.2 Why Checkpoint-Based Design¶
The drain-loop fix enables multi-step advancement per reconciliation cycle. Combined with checkpoint persistence, each step becomes:
reconcile(session) β
read checkpoint β
skip completed steps β
execute next step β
persist checkpoint β
return requeue β
[drain-loop picks up self-induced event] β
reconcile(session) β
read checkpoint β
skip completed steps β
execute next step β
...until READY
This gives us resilience (restart from last checkpoint), visibility (frontend can show step progress), and flexibility (pipeline steps can be customized per LabletDefinition).
2. Pipeline Architecture¶
2.1 Step Dependency Graph¶
The pipeline is not a flat list β steps have explicit prerequisites. The reconciler evaluates which step to execute next by checking whether all prerequisites are satisfied (completed or skipped).
INSTANTIATING entered
β
ββββββββββββββΌβββββββββββββ
βΌ βΌ β
βββββββββββββ ββββββββββββ β
βcontent_syncβ βvariables β β
β Β§6 β β Β§5 β β
βββββββ¬ββββββ ββββββ¬ββββββ β
β β β
ββββββββ¬βββββββ β
βΌ β
βββββββββββββββ β
β lab_resolve β β
β reuse/import β β
ββββββββ¬βββββββ β
βΌ β
βββββββββββββββ β
β ports_alloc β Β§3 β
β allocate on β β
β LabRecord β β
ββββββββ¬βββββββ β
βΌ β
βββββββββββββββ β
β tags_sync β Β§3 β
β write tags β β
β to CML nodes β β
ββββββββ¬βββββββ β
βΌ β
βββββββββββββββ β
β lab_binding β Β§4 β
β bind record β β
β to session β β
ββββββββ¬βββββββ β
βΌ β
βββββββββββββββ β
β lab_start β β
β CML start β β
β β BOOTED β β
ββββββββ¬βββββββ β
βΌ β
ββββββββββββββββ β
β lds_provision β β
β LDS session β β
β + device map β β
ββββββββ¬ββββββββ β
βΌ β
βββββββββββββββ β
β mark_ready β β
βββββββββββββββ β
2.2 Step Definitions with Prerequisites¶
| Step | Prerequisites | Gate Condition | Action | Skip Condition |
|---|---|---|---|---|
content_sync |
β (root) | Definition exists | Verify sync_status == "synced" |
content_sync_enabled == False |
variables |
β (root) | Definition exists | Resolve variables from definition defaults | No variables defined (placeholder) |
lab_resolve |
content_sync, variables |
Topology YAML available | Find reusable lab or import fresh β yields cml_lab_id |
β |
ports_alloc |
lab_resolve |
cml_lab_id resolved, port_template exists |
Allocate ports from worker pool via PortAllocationService β stored on LabRecord |
No port_template on definition |
tags_sync |
ports_alloc |
Ports allocated (or skipped) | Write allocated port numbers to CML node tags via PATCH API |
No port_template (no ports to sync) |
lab_binding |
lab_resolve, tags_sync |
Lab resolved, tags synced (or skipped) | Bind LabRecord to session, create LabRunRecord | β |
lab_start |
lab_binding |
LabRecord bound to session | Start CML lab, poll until BOOTED | Lab already BOOTED (reuse case) |
lds_provision |
lab_start |
Lab BOOTED | Create LDS session, map devices, get launch URL | No form_qualified_name (no LDS) |
mark_ready |
lds_provision |
LDS provisioned (or skipped) | Atomic transition to READY | β (always runs) |
2.3 InstantiationProgress Model¶
A new instantiation_progress field on LabletSessionState tracks pipeline state:
# domain/value_objects/instantiation_progress.py (NEW β control-plane-api)
@dataclass
class StepResult:
"""Result of a single pipeline step."""
step: str # Step name (e.g., "content_sync", "ports_alloc")
status: str # "pending" | "completed" | "failed" | "skipped"
requires: list[str] # Prerequisite step names
completed_at: datetime | None # When step completed
result_data: dict | None # Step-specific evidence (e.g., {"cml_lab_id": "..."})
error: str | None # Error message if failed
attempt_count: int # Number of attempts (for retry tracking)
@dataclass
class InstantiationProgress:
"""Tracks the instantiation pipeline state."""
steps: list[StepResult] # All step results (dependency-ordered)
started_at: datetime # When pipeline started
current_step: str | None # Currently executing step
completed_at: datetime | None # When pipeline completed (all steps done)
pipeline_version: str # Pipeline definition version (for migration)
@property
def is_complete(self) -> bool:
return all(s.status in ("completed", "skipped") for s in self.steps)
def next_executable_step(self) -> StepResult | None:
"""Find next step whose prerequisites are all satisfied."""
completed = {s.step for s in self.steps if s.status in ("completed", "skipped")}
for step in self.steps:
if step.status == "pending" and all(r in completed for r in step.requires):
return step
return None
def get_step(self, name: str) -> StepResult | None:
return next((s for s in self.steps if s.step == name), None)
def complete_step(self, name: str, result_data: dict | None = None) -> None:
step = self.get_step(name)
if step:
step.status = "completed"
step.completed_at = datetime.now(timezone.utc)
step.result_data = result_data
step.attempt_count += 1
def fail_step(self, name: str, error: str) -> None:
step = self.get_step(name)
if step:
step.status = "failed"
step.error = error
step.attempt_count += 1
The next_executable_step() method is the key difference from v1 β instead of iterating a flat list, it checks the dependency graph to find the next step whose prerequisites are all satisfied.
2.4 Default Pipeline Factory¶
def build_default_pipeline(definition: LabletDefinitionReadModel) -> InstantiationProgress:
"""Build the default pipeline for a definition.
Future: LabletDefinition can override steps / skip conditions.
"""
has_ports = bool(definition.port_template)
return InstantiationProgress(
steps=[
StepResult(step="content_sync", requires=[],
status="skipped" if not definition.content_sync_enabled else "pending"),
StepResult(step="variables", requires=[], status="skipped"), # placeholder
StepResult(step="lab_resolve", requires=["content_sync", "variables"], status="pending"),
StepResult(step="ports_alloc", requires=["lab_resolve"],
status="skipped" if not has_ports else "pending"),
StepResult(step="tags_sync", requires=["ports_alloc"],
status="skipped" if not has_ports else "pending"),
StepResult(step="lab_binding", requires=["lab_resolve", "tags_sync"], status="pending"),
StepResult(step="lab_start", requires=["lab_binding"], status="pending"),
StepResult(step="lds_provision", requires=["lab_start"],
status="skipped" if not definition.form_qualified_name else "pending"),
StepResult(step="mark_ready", requires=["lds_provision"], status="pending"),
],
started_at=datetime.now(timezone.utc),
current_step=None,
completed_at=None,
pipeline_version="1.0",
)
2.5 Domain Event: InstantiationProgressUpdated¶
# domain/events/lablet_session_events.py (MODIFY)
@dataclass
class LabletSessionInstantiationProgressUpdatedDomainEvent(LabletSessionDomainEventBase):
"""Published when a pipeline step completes."""
type: str = "com.cisco.lcm.lablet-session.instantiation-progress-updated.v1"
step_name: str = ""
step_status: str = "" # "completed" | "failed" | "skipped"
progress_data: dict = field(default_factory=dict)
2.6 New CPA Internal Endpoint¶
# api/controllers/internal/lablet_session_internal_controller.py (MODIFY)
# POST /api/internal/lablet-sessions/{session_id}/instantiation-progress
# Body: { "step_name": "ports_alloc", "step_status": "completed", "result_data": {...} }
# Called by: lablet-controller after each pipeline step
2.7 Reconciler Integration (Option A β Inline)¶
# lablet_reconciler.py β _handle_instantiating (revised)
async def _handle_instantiating(self, instance: LabletSessionReadModel) -> ReconciliationResult:
"""Handle INSTANTIATING β execute next pipeline step."""
# Build or resume progress
progress = self._parse_progress(instance) or self._build_default_progress(instance)
if progress.is_complete:
return ReconciliationResult.success()
# Find next executable step (prerequisites satisfied)
next_step = progress.next_executable_step()
if not next_step:
# All pending steps have unsatisfied prerequisites β likely a failed dependency
failed = [s for s in progress.steps if s.status == "failed"]
if failed:
return ReconciliationResult.failed(
f"Pipeline blocked: {failed[0].step} failed β {failed[0].error}"
)
return ReconciliationResult.requeue("Waiting for prerequisites")
# Dispatch to step handler
handler = getattr(self, f"_step_{next_step.step}", None)
if not handler:
return ReconciliationResult.failed(f"Unknown pipeline step: {next_step.step}")
try:
result = await handler(instance, progress)
except Exception as e:
result = StepResult(step=next_step.step, status="failed", error=str(e))
# Persist progress to CPA
await self._api.update_instantiation_progress(
session_id=instance.id,
step_name=result.step,
step_status=result.status,
result_data=result.result_data,
error=result.error,
)
if result.status == "failed":
return ReconciliationResult.requeue(f"Step {result.step} failed: {result.error}")
return ReconciliationResult.requeue(f"Step {result.step} {result.status}")
The step handler naming convention _step_{step_name} means adding tags_sync simply requires adding a _step_tags_sync(self, instance, progress) method to the reconciler. No dispatch map changes needed.
3. Port Allocation & Tag Sync β LabRecord Topology Concern¶
3.1 Fundamental Insight¶
Ports are not a LabletSession concern and not a LabRunRecord concern β they are a LabRecord topology concern. CML node tags encode protocol:port pairs that are part of the lab graph (nodes, edges, tags). Tags persist across start/stop/wipe cycles β they are set once when the lab is created/imported and only updated when ports are re-allocated.
Port lifecycle:
- LabRecord created/imported β ports allocated from worker pool β stored as
LabRecord.allocated_ports - Tags synced to CML nodes β
PATCH /api/v0/labs/{lab_id}/nodes/{node_id}writes["protocol:port", ...] - Lab started β CML reads tags for port-forwarding β ports are active
- Lab stopped / wiped β tags unchanged (topology-level, persist)
- Lab reused (next session) β same LabRecord, same ports, same tags β no re-allocation
- LabRecord deleted β ports released from etcd β available for reuse
3.2 Current State (Problems)¶
| Component | Current Behavior | Problem |
|---|---|---|
resource-scheduler _handle_assign() |
Builds allocated_ports from static port_template.port_entries |
Static template values β dynamically allocated ports |
resource-scheduler PlacementEngine |
Only counts available ports (filter: need N, have M) | β Correct β this is scheduling, not allocation |
control-plane-api PortAllocationService |
Full etcd-based allocation with atomic guarantees | Exists but never called during scheduling! |
| LabletSession.allocated_ports | Stores whatever dict is passed at schedule time | Stores static template values; should be denormalized from LabRecord |
| LabRunRecord | Has lablet_session_id but no port mapping |
β Correct β LabRunRecord tracks runtime (start/stop/duration), not topology |
| CML node tags | Contain placeholder port numbers from YAML authoring time | Never updated with allocated ports β critical gap |
| CML SPI client | Only reads nodes | No PATCH node endpoint β cannot write tags |
| Lab discovery | Discovers labs, extracts ExternalInterface VOs |
No port allocation tracking for non-session labs |
3.3 Port Ownership Model¶
βββββββββββββββββββββββ
β LabletDefinition β
β port_template: β
β [{name, protocol}]β
ββββββββββββ¬βββββββββββ
β "what ports are needed"
βΌ
βββββββββββββββββββββββ
β PortAllocationServiceβ
β (etcd-backed, atomic)β
β allocate(worker, β
β lab_record_id, β
β port_template) β
β β {name: port_num} β
ββββββββββββ¬βββββββββββ
β "actual port numbers"
βΌ
βββββββββββββββββββββββ
β LabRecord β
β allocated_ports: ββββ canonical owner
β {name: port_num} β
β external_interfacesββββ derived from allocated_ports
ββββββββββββ¬βββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
βΌ β βΌ
βββββββββββββββββββ β βββββββββββββββββββ
β CML Nodes β β β LabletSession β
β tags: [ β β β allocated_ports: ββββ denormalized
β "serial:3001β β β (from LabRecord)β copy via
β "vnc:3002" β β βββββββββββββββββββ binding
β ] β β
βββββββββββββββββββ β
βΌ
βββββββββββββββββββββββ
β LabRunRecord β
β (runtime only) β
β started_at β
β stopped_at β
β lablet_session_id β
β final_state β
β (NO port fields) β
βββββββββββββββββββββββ
3.4 LabRecord Gains allocated_ports¶
# domain/entities/lab_record.py (MODIFY β LabRecordState)
# Add field:
allocated_ports: dict[str, int] | None # {port_name: port_number} e.g. {"PC_serial": 3001, "PC_vnc": 3002}
# New domain event:
@dataclass
class LabRecordPortsAllocatedDomainEvent(LabRecordDomainEventBase):
"""Published when ports are allocated to a LabRecord."""
type: str = "com.cisco.lcm.lab-record.ports-allocated.v1"
allocated_ports: dict = field(default_factory=dict)
# New method on LabRecord aggregate:
def allocate_ports(self, allocated_ports: dict[str, int]) -> None:
"""Record port allocation on this lab record."""
self.record_event(LabRecordPortsAllocatedDomainEvent(
aggregate_id=self.id(),
allocated_ports=allocated_ports,
))
# @dispatch handler:
@dispatch(LabRecordPortsAllocatedDomainEvent)
def on(self, event: LabRecordPortsAllocatedDomainEvent) -> None:
self.allocated_ports = dict(event.allocated_ports)
3.5 LabRunRecord: NO Port Fields¶
LabRunRecord tracks runtime only β when a lab ran, for how long, for which session. It does NOT own ports. This is unchanged from the current codebase:
# domain/value_objects/lab_run_record.py β NO CHANGES to fields
@dataclass(frozen=True)
class LabRunRecord:
run_id: str
started_at: datetime
stopped_at: datetime | None = None
duration_seconds: int | None = None
started_by: str = "system"
stop_reason: str | None = None
lablet_session_id: str | None = None
final_state: str | None = None
# NO allocated_ports field β ports belong to LabRecord
3.6 Pipeline Step: ports_alloc¶
This step runs after lab_resolve (we need the resolved lab to know the worker and topology). It allocates ports from the worker pool and stores them on the LabRecord.
async def _step_ports_alloc(self, instance, progress) -> StepResult:
"""Allocate real ports from worker pool via CPA PortAllocationService."""
definition = await self._get_definition(instance.definition_id)
if not definition or not definition.port_template:
return StepResult(step="ports_alloc", status="skipped")
# Get lab_resolve result (contains lab_record_id)
resolve_result = progress.get_step("lab_resolve")
lab_record_id = resolve_result.result_data.get("lab_record_id") if resolve_result else None
if not lab_record_id:
return StepResult(step="ports_alloc", status="failed", error="No lab_record_id from lab_resolve")
# Call CPA internal endpoint β allocates via PortAllocationService (etcd)
# Ports are stored on the LabRecord, keyed by lab_record_id in etcd
result = await self._api.allocate_lab_record_ports(
lab_record_id=lab_record_id,
worker_id=instance.worker_id,
)
# result = {"allocated_ports": {"PC_serial": 3001, "PC_vnc": 3002}}
return StepResult(
step="ports_alloc", status="completed",
result_data=result,
)
3.6.1 CPA Command: AllocateLabRecordPortsCommand¶
# application/commands/lab_record/allocate_lab_record_ports_command.py (NEW)
@dataclass
class AllocateLabRecordPortsCommand(Command[OperationResult[dict]]):
"""Allocate ports from worker pool and store on LabRecord."""
lab_record_id: str
worker_id: str
class AllocateLabRecordPortsCommandHandler(CommandHandlerBase, ...):
async def handle_async(self, request, ...):
# 1. Load LabRecord and its definition's port_template
lab_record = await self._lab_record_repo.get_by_id_async(request.lab_record_id)
if not lab_record:
return self.not_found("LabRecord", request.lab_record_id)
# 2. Check if already allocated (idempotent)
if lab_record.state.allocated_ports:
return self.ok({"allocated_ports": lab_record.state.allocated_ports, "already_allocated": True})
# 3. Get port_template from definition
definition = await self._definition_repo.get_by_id_async(lab_record.state.definition_id)
if not definition or not definition.state.port_template:
return self.ok({"allocated_ports": {}, "skipped": True})
port_template = PortTemplate.from_dict(definition.state.port_template)
# 4. Allocate via PortAllocationService (etcd) β keyed by lab_record_id
result = await self._port_service.allocate_ports(
worker_id=request.worker_id,
session_id=request.lab_record_id, # Use lab_record_id as etcd key
port_template=port_template,
)
if not result.success:
return self.conflict(f"Port allocation failed: {result.error}")
# 5. Store on LabRecord
lab_record.allocate_ports(result.allocated_ports)
await self._lab_record_repo.update_async(lab_record)
return self.ok({"allocated_ports": result.allocated_ports})
Note on etcd key: The existing PortAllocationService.allocate_ports() uses session_id as its etcd key parameter. We pass lab_record_id in that parameter β semantically it's "the entity that owns these ports." The etcd key structure is ports/{worker_id}/{owner_id}. No code change needed in PortAllocationService itself β only the caller changes what ID it passes.
3.7 Pipeline Step: tags_sync¶
After ports are allocated, write them to the CML node tags. This step requires a new CML SPI method to patch node tags.
async def _step_tags_sync(self, instance, progress) -> StepResult:
"""Write allocated port numbers to CML node tags via PATCH API."""
ports_step = progress.get_step("ports_alloc")
if not ports_step or ports_step.status != "completed":
return StepResult(step="tags_sync", status="skipped")
allocated_ports = ports_step.result_data.get("allocated_ports", {})
if not allocated_ports:
return StepResult(step="tags_sync", status="skipped")
resolve_step = progress.get_step("lab_resolve")
cml_lab_id = resolve_step.result_data.get("cml_lab_id") if resolve_step else None
if not cml_lab_id:
return StepResult(step="tags_sync", status="failed", error="No cml_lab_id from lab_resolve")
# Group allocated ports by node label
# Port names follow convention: "{node_label}_{protocol}" (from PortTemplate.from_cml_nodes)
node_tags: dict[str, list[str]] = {} # {node_label: ["protocol:port", ...]}
for port_name, port_number in allocated_ports.items():
# Parse "{node_label}_{protocol}" β (node_label, protocol)
parts = port_name.rsplit("_", 1)
if len(parts) != 2:
continue
node_label, protocol = parts
tag = f"{protocol}:{port_number}"
node_tags.setdefault(node_label, []).append(tag)
# Get CML lab nodes to find node IDs
nodes = await self._cml_labs.get_lab_nodes(
worker_id=instance.worker_id,
lab_id=cml_lab_id,
)
# Write tags to each node via PATCH
synced_nodes = []
for node in nodes:
node_label = node.get("label", "")
safe_label = re.sub(r"[^a-zA-Z0-9_-]", "_", node_label)
if safe_label in node_tags:
await self._cml_labs.patch_node_tags(
worker_id=instance.worker_id,
lab_id=cml_lab_id,
node_id=node["id"],
tags=node_tags[safe_label],
)
synced_nodes.append(node_label)
return StepResult(
step="tags_sync", status="completed",
result_data={"synced_nodes": synced_nodes, "tag_count": sum(len(t) for t in node_tags.values())},
)
3.7.1 CML SPI Extension: patch_node_tags¶
The CML REST API supports PATCH /api/v0/labs/{lab_id}/nodes/{node_id} with a NodeUpdate schema that includes tags: list[str]. The SPI client needs a new method:
# integration/services/cml_labs_spi_client.py (MODIFY β lablet-controller)
async def patch_node_tags(
self, worker_id: str, lab_id: str, node_id: str, tags: list[str]
) -> dict:
"""Update tags on a CML lab node.
Uses PATCH /api/v0/labs/{lab_id}/nodes/{node_id} with NodeUpdate schema.
Tags are fully replaced (not merged) β caller must provide the complete tag list.
Args:
worker_id: CML worker hosting the lab
lab_id: CML lab ID
node_id: CML node ID within the lab
tags: Complete list of tags (e.g., ["serial:3001", "vnc:3002"])
Returns:
Updated node dict from CML API
"""
worker = await self._get_worker(worker_id)
url = f"{worker.endpoint}/api/v0/labs/{lab_id}/nodes/{node_id}"
payload = {"tags": tags}
response = await self._session.patch(url, json=payload, headers=self._auth_headers(worker))
response.raise_for_status()
return response.json()
3.8 Scheduler Changes¶
The scheduler stops passing static port values. It only assigns the worker:
# BEFORE (scheduler_hosted_service.py _handle_assign):
allocated_ports = {entry["name"]: entry["port"] for entry in port_entries}
await self._api.schedule_session(..., allocated_ports=allocated_ports, ...)
# AFTER:
# Scheduler passes EMPTY ports β just the worker assignment
await self._api.schedule_session(..., allocated_ports={}, ...)
The PlacementEngine's port count filter (Stage 5) remains unchanged β it correctly checks availability without allocating.
3.9 Lab Discovery Port Tracking¶
When LabDiscoveryService discovers a STARTED/BOOTED lab without a session, the CML node tags already contain actual port mappings. These should be registered in etcd to prevent allocation conflicts:
# In lab_discovery_service.py β when creating/updating a discovered lab:
# ExternalInterfaces are already extracted from CML node tags
# Record ports on LabRecord and register in etcd
lab_record.allocate_ports(discovered_port_mapping) # from ExternalInterface VOs
await self._api.register_discovered_ports(
worker_id=worker_id,
lab_record_id=lab_record_id,
ports=discovered_port_mapping,
)
This ensures the PortAllocationService's etcd state reflects all active port consumers, not just session-initiated ones.
3.10 Port Release Lifecycle¶
Ports are released when a LabRecord is deleted or recycled β NOT when a session expires. This is critical for lab reuse:
| Event | Port Action | Why |
|---|---|---|
| Session expires / stops | No port release | LabRecord may be reused by next session |
| Lab wiped | No port release | Tags persist; same ports reused on next start |
| Lab stopped | No port release | Same ports when restarted |
| LabRecord deleted | Release ports from etcd | LabRecord gone β ports available |
| LabRecord unbound + recycled (definition changed) | Release + re-allocate | New topology may need different ports |
This differs from the previous design where ports were released at session expiry.
4. Lab Binding & LabRunRecord¶
4.1 Step Ordering Rationale¶
lab_binding runs after lab_resolve and tags_sync because:
- We need a resolved
cml_lab_idto know which LabRecord to bind - Tags must be synced before binding so the LabRecord's port state is consistent
- The LabRunRecord captures runtime context (who started it, when, for which session) β it does NOT carry port data
4.2 Current State (Problems)¶
| Component | Current Behavior | Problem |
|---|---|---|
| LabRecord.bind_to_lablet() | Emits LabBoundToLabletDomainEvent |
Event handler is a no-op (pass). No binding state persisted. |
| LabletSession.schedule() | Accepts lab_record_id parameter |
Stored as simple string FK β no validation at schedule time. |
| SchedulerHostedService._handle_assign() | Passes lab_record_id = definition.get("lab_record_id", "") |
Empty string β scheduler doesn't resolve lab records. |
| LabletReconciler._resolve_lab_for_instance() | Finds/imports labs at instantiation | Creates CML lab but doesn't create a LabRunRecord or update lab_record_id on session. |
4.3 Proposed Design¶
4.3.1 Pipeline Step: lab_binding¶
After lab_resolve provides a cml_lab_id and tags_sync has written port tags to CML nodes, bind the LabRecord to the session:
async def _step_lab_binding(self, instance, progress) -> StepResult:
"""Bind LabRecord to session and create LabRunRecord."""
# Get lab_resolve result (contains cml_lab_id and lab_record_id)
resolve_result = progress.get_step("lab_resolve")
cml_lab_id = resolve_result.result_data.get("cml_lab_id") if resolve_result else None
lab_record_id = resolve_result.result_data.get("lab_record_id") if resolve_result else None
if not cml_lab_id or not lab_record_id:
return StepResult(step="lab_binding", status="failed", error="No cml_lab_id/lab_record_id from lab_resolve")
# Call CPA internal endpoint to:
# 1. Bind LabRecord to session (set active_lablet_session_id)
# 2. Create LabRunRecord (runtime tracking)
# 3. Denormalize LabRecord.allocated_ports onto LabletSession
result = await self._api.bind_lab_to_session(
session_id=instance.id,
worker_id=instance.worker_id,
lab_record_id=lab_record_id,
)
return StepResult(
step="lab_binding", status="completed",
result_data=result, # {"lab_record_id": "...", "run_id": "...", "allocated_ports": {...}}
)
4.3.2 CPA Command: BindLabToSessionCommand¶
# application/commands/lablet_session/bind_lab_to_session_command.py (NEW)
@dataclass
class BindLabToSessionCommand(Command[OperationResult[dict]]):
"""Bind a LabRecord to a session and create a LabRunRecord."""
session_id: str
worker_id: str
lab_record_id: str
# NO allocated_ports β ports are on the LabRecord already
class BindLabToSessionCommandHandler(CommandHandlerBase, ...):
async def handle_async(self, request, ...):
# 1. Load LabRecord
lab_record = await self._lab_record_repo.get_by_id_async(request.lab_record_id)
if not lab_record:
return self.not_found("LabRecord", request.lab_record_id)
# 2. Create LabRunRecord (runtime tracking β NO port fields)
run = LabRunRecord(
run_id=str(uuid4()),
started_at=datetime.now(timezone.utc),
started_by="lablet-controller",
lablet_session_id=request.session_id,
)
lab_record.record_run(run)
# 3. Bind LabRecord to session
lab_record.bind_to_lablet(
lablet_session_id=request.session_id,
binding_id=run.run_id,
binding_role="instantiation",
)
await self._lab_record_repo.update_async(lab_record)
# 4. Denormalize LabRecord.allocated_ports onto LabletSession
session = await self._session_repo.get_by_id_async(request.session_id)
session.bind_lab_record(lab_record.id())
allocated_ports = lab_record.state.allocated_ports or {}
if allocated_ports:
session.update_allocated_ports(allocated_ports)
await self._session_repo.update_async(session)
return self.ok({
"lab_record_id": lab_record.id(),
"run_id": run.run_id,
"allocated_ports": allocated_ports, # Returned for caller visibility
})
4.3.3 Fix LabRecord Bind Event Handler¶
The current handler is a no-op (pass). It should track the active binding:
# domain/entities/lab_record.py (MODIFY)
# Add fields to LabRecordState:
active_lablet_session_id: str | None # Currently bound session (if any)
active_binding_id: str | None # Current binding ID (LabRunRecord.run_id)
@dispatch(LabRecordBoundToLabletDomainEvent)
def on(self, event: LabRecordBoundToLabletDomainEvent) -> None:
"""Apply binding event β records that a lablet session is using this lab."""
self.active_lablet_session_id = event.lablet_session_id
self.active_binding_id = event.binding_id
@dispatch(LabRecordUnboundFromLabletDomainEvent)
def on(self, event: LabRecordUnboundFromLabletDomainEvent) -> None:
"""Apply unbinding event."""
if self.active_binding_id == event.binding_id:
self.active_lablet_session_id = None
self.active_binding_id = None
4.4 Scheduler: No Lab Binding at Schedule Time¶
The scheduler continues to pass lab_record_id="" β lab records are resolved and bound during instantiation, not scheduling. The ScheduleLabletSessionCommand already handles empty lab_record_id.
5. Variables Step¶
5.1 Purpose¶
A placeholder step for future integration with a variable resolution service. Variables would be resolved from:
- User-provided values at session creation time
- Definition-level defaults
- Environment-level overrides (dev/staging/prod)
- Dynamic values computed at instantiation (e.g., credentials, connection strings)
5.2 Implementation (Placeholder)¶
async def _step_resolve_variables(self, instance: LabletSessionReadModel) -> StepResult:
"""Resolve session variables. Currently a no-op placeholder."""
# Gate: definition has variables defined
definition = await self._get_definition(instance.definition_id)
variables = getattr(definition, 'variables', None)
if not variables:
return StepResult(step="variables", status="skipped")
# Future: call variable resolution service
# For now, pass through definition defaults
resolved = {var.name: var.default_value for var in variables if var.default_value}
return StepResult(
step="variables",
status="completed",
result_data={"resolved_variables": resolved},
)
5.3 Future Extension Points¶
When a variable resolution service is available:
- Add
VariableResolutionSpiClientto lablet-controller - Call the SPI with (definition_id, session_id, context) during this step
- Store resolved variables on LabletSession (new
resolved_variablesfield) - Downstream steps (lab_resolve, lds_provision) consume resolved variables
6. Content Sync Prerequisite¶
6.1 Purpose¶
Before attempting to instantiate a lab, verify that the definition's content package is synchronized and available. This is a fail-fast check β if content is not synced, there's no point importing a lab (it will fail anyway because LDS provisioning requires the form_qualified_name and content).
6.2 Implementation¶
async def _step_verify_content_sync(self, instance: LabletSessionReadModel) -> StepResult:
"""Verify definition content is synced and available."""
definition = await self._get_definition(instance.definition_id)
if not definition:
return StepResult(step="content_sync", status="failed", error="Definition not found")
# Check if content sync is relevant for this definition
if not getattr(definition, 'content_sync_enabled', False):
return StepResult(step="content_sync", status="skipped")
# Check sync status
sync_status = getattr(definition, 'sync_status', None)
if sync_status == "synced":
return StepResult(step="content_sync", status="completed",
result_data={"sync_status": sync_status,
"form_qualified_name": definition.form_qualified_name})
# Not synced β trigger sync and fail this step (will retry on next reconcile)
if sync_status in (None, "not_synced", "sync_failed"):
# Optionally trigger a sync request
try:
await self._content_sync_service.request_sync(definition.id)
except Exception as e:
logger.warning(f"Could not trigger content sync for {definition.id}: {e}")
return StepResult(step="content_sync", status="failed",
error=f"Content not synced (status: {sync_status}). Sync requested.")
# sync_requested or syncing β wait
return StepResult(step="content_sync", status="failed",
error=f"Content sync in progress (status: {sync_status}). Waiting.")
6.3 Interaction with ContentSyncService¶
The content_sync step doesn't perform the sync itself β it delegates to the existing ContentSyncService (if needed) and checks the sync_status field on the definition read model. The reconciler will naturally retry failed steps on the next cycle.
7. Timeslot Lifecycle & Downstream Cleanup¶
7.1 Timeslot as Lifecycle Container¶
A LabletSession maps 1:1 to a timeslot (timeslot_start, timeslot_end). The timeslot defines when the session's resources are active. All lifecycle operations happen within the timeslot boundary.
Timeslot operations (already on LabletSession):
| Operation | Method | Behavior |
|---|---|---|
| Extend | extend_timeslot(new_end) |
Move timeslot_end forward β extends active window |
| Start | start() |
Transition SCHEDULED β INSTANTIATING (reconciler picks it up) |
| Stop | stop() / release_capacity() |
Graceful stop β releases worker capacity |
| Cancel | cancel() |
Cancel before start β no resources to clean up |
| Requeue | requeue() |
Re-enter scheduling queue (e.g., after worker failure) |
Not implemented (and not needed per current design): pause/resume. The session state machine is forward-only: once started, a session progresses toward completion or failure.
7.2 Sync Rules¶
A LabletSession may be synced (reconciled) at any time during its timeslot. The reconciler checks the session status and acts accordingly:
| Session Status | Timeslot Status | Reconciler Action |
|---|---|---|
| INSTANTIATING | Active (inside timeslot) | Execute next pipeline step |
| INSTANTIATING | Expired (past timeslot_end) |
Fail session β trigger cleanup |
| READY / RUNNING | Active | Monitor health, sync ports |
| READY / RUNNING | Expired | Expire session β trigger cleanup |
| STOPPING | Any | Continue shutdown sequence |
| COMPLETED / FAILED | Any | No action (terminal states) |
Key rule: An active session (inside its timeslot) may be synced anytime β this is the normal reconciliation path. The pipeline advances one step per reconcile cycle.
7.3 Expired Session Cleanup¶
When a session's timeslot expires, the sync must handle downstream resource cleanup. Note: ports are NOT released at session expiry because they belong to the LabRecord (topology-level) and persist for lab reuse.
Session Expired
β
βββ 1. Update LabletSession status β EXPIRED
β
βββ 2. UserSession cleanup
β ββ Terminate LDS UserSession (if provisioned)
β ββ Remove user access credentials
β
βββ 3. LabRecord cleanup
β ββ Stop the lab (if running)
β ββ Wipe lab state (if configured by definition)
β ββ Close LabRunRecord (set stopped_at, stop_reason="timeslot_expired")
β ββ Unbind from session (clear active_lablet_session_id)
β ββ LabRecord.allocated_ports UNCHANGED β ports persist for reuse
β ββ CML node tags UNCHANGED β topology-level, persist across cycles
β
βββ 4. Capacity cleanup
β ββ Release worker capacity (ReleaseCapacityCommand)
β ββ Worker slot becomes available for scheduling
β
βββ 5. Reporting
ββ Publish session completion report
ββ Record usage metrics (duration, resource consumption)
Key difference from previous design: Port release is NOT part of session expiry cleanup. Ports are released only when the LabRecord itself is deleted (e.g., worker decommissioned, definition removed, manual cleanup). See Β§3.10 for the full port release lifecycle.
### 7.4 Implementation: Expiry Check in Reconciler
The reconciler already handles multiple session states. Add an **early expiry check** before status-specific handling:
```python
async def _reconcile_instance(self, instance: LabletSessionReadModel) -> ReconciliationResult:
"""Reconcile a single LabletSession instance."""
# Early check: timeslot expired?
if instance.timeslot_end and datetime.now(timezone.utc) > instance.timeslot_end:
if instance.status not in ("COMPLETED", "FAILED", "CANCELLED", "EXPIRED"):
return await self._handle_expired(instance)
# Normal status-based dispatch
match instance.status:
case "INSTANTIATING":
return await self._handle_instantiating(instance) # Pipeline executor
case "READY" | "RUNNING":
return await self._handle_active(instance)
case "STOPPING":
return await self._handle_stopping(instance)
...
7.5 Cleanup Command: ExpireLabletSessionCommand¶
# application/commands/lablet_session/expire_lablet_session_command.py (NEW β CPA)
@dataclass
class ExpireLabletSessionCommand(Command[OperationResult[dict]]):
"""Handle session expiry: update status and trigger downstream cleanup."""
session_id: str
reason: str = "timeslot_expired"
class ExpireLabletSessionCommandHandler(CommandHandlerBase, ...):
async def handle_async(self, request, ...):
session = await self._session_repo.get_by_id_async(request.session_id)
if not session:
return self.not_found("LabletSession", request.session_id)
# 1. Expire the session
session.expire(reason=request.reason)
# 2. Close the LabRunRecord and unbind β but DO NOT release ports
if session.state.lab_record_id:
lab_record = await self._lab_record_repo.get_by_id_async(session.state.lab_record_id)
if lab_record:
lab_record.close_run(
lablet_session_id=request.session_id,
stop_reason=request.reason,
final_state="expired",
)
lab_record.unbind_from_lablet(lablet_session_id=request.session_id)
# NOTE: lab_record.allocated_ports is UNCHANGED β ports are
# topology-level and persist for future sessions. CML node tags
# are also unchanged. See Β§3.10 for port release lifecycle.
await self._lab_record_repo.update_async(lab_record)
# 3. Release capacity (NOT ports β ports belong to LabRecord)
await self.mediator.execute_async(
ReleaseCapacityCommand(
session_id=request.session_id,
worker_id=session.state.worker_id,
)
)
await self._session_repo.update_async(session)
return self.ok({"session_id": request.session_id, "status": "expired"})
Key change from v2: No release_ports() call. Ports belong to LabRecord.allocated_ports (topology-level) and are not affected by session expiry. Port release occurs only on LabRecord deletion (see Β§3.10).
Note: UserSession (LDS) cleanup and lab stop/wipe are triggered by the lablet-controller β the CPA records the state change, and the reconciler handles the physical teardown on the next cycle (stop CML lab, delete LDS session).
8. Pipeline UI (Phase 5)¶
8.1 Current State¶
The Pipeline tab in SessionDetailModal renders "Coming soonβ¦". The Phase 5 plan from labletsession-ux.md specifies "dot-indicators" for pipeline stages.
8.2 Proposed UI¶
With instantiation_progress persisted on the session, the Pipeline tab can render:
Pipeline Tab (steps follow DAG order)
βββββββββββββββββββββββββββββββββββββββββββββββ
β β Content Sync β
completed 00:02 β
β β Variables β skipped β β
β β Lab Resolution β
completed 00:15 β
β β Port Allocation β
completed 00:01 β
β β Tag Sync β
completed 00:02 β
β β Lab Binding β
completed 00:03 β
β β Lab Start π running 01:23 β
β β LDS Provision β³ pending β β
β β Mark Ready β³ pending β β
βββββββββββββββββββββββββββββββββββββββββββββββ
Each step shows:
- Dot indicator: β completed (green), π running (blue pulse), β failed (red), β skipped (gray), β³ pending (outline)
- Step name: Human-readable label
- Duration: Time from start to completion
- Retry count: If attempt_count > 1, show "(retry N)"
- Error detail: Expandable on click if failed
8.3 Data Source¶
The Pipeline tab reads from session.instantiation_progress which is included in the session detail DTO. The GetLabletSessionDetailQuery already returns the full session state β it just needs the new instantiation_progress field serialized.
8.4 SSE Integration¶
When LabletSessionInstantiationProgressUpdatedDomainEvent is published, the SSE relay can broadcast to subscribed clients for real-time step updates without polling.
9. Cross-Service Implementation Plan¶
Phase 1: Domain Model & CPA Foundation (control-plane-api) β COMPLETE¶
Priority: HIGH | Effort: Large | Risk: LOW Completed: 2026-03-02 | Tests: 821 passed, 0 failures | Lint: All checks passed
| # | File | Action | Status | Notes |
|---|---|---|---|---|
| 1 | domain/value_objects/instantiation_progress.py |
CREATE | β | InstantiationProgress, StepResult VOs with requires dependency fields (9 steps incl. tags_sync) |
| 2 | domain/entities/lab_record.py |
MODIFY | β | Added allocated_ports, allocate_ports(), LabRecordPortsAllocatedDomainEvent, fixed bind_to_lablet, added active_lablet_session_id / active_binding_id. Note: close_run() deferred β not needed for Phase 1 expiry flow |
| 3 | domain/events/lab_record_events.py |
MODIFY | β | Added LabRecordPortsAllocatedDomainEvent |
| 4 | domain/events/lablet_session_events.py |
MODIFY | β | Added LabletSessionInstantiationProgressUpdatedDomainEvent, LabletSessionExpiredDomainEvent, LabletSessionLabBoundDomainEvent (extra β needed by bind command) |
| 5 | domain/entities/lablet_session.py |
MODIFY | β | Added instantiation_progress field, update_instantiation_progress(), expire(), bind_lab() methods + @dispatch handlers |
| 6 | application/commands/lab/allocate_lab_record_ports_command.py |
CREATE | β | Command + handler using PortAllocationService. Note: actual path is commands/lab/ (not commands/lab_record/) per existing convention |
| 7 | application/commands/lablet_session/update_instantiation_progress_command.py |
CREATE | β | Command + handler with _build_default_progress() that resolves capability flags from LabletDefinition |
| 8 | application/commands/lablet_session/bind_lab_to_session_command.py |
CREATE | β | Command + handler β binds LabRecord to session, creates LabRunRecord, denormalizes allocated_ports |
| 9 | application/commands/lablet_session/expire_lablet_session_command.py |
CREATE | β | Command + handler β expiry cleanup (unbind, release capacity β NO port release). close_run() skipped (not yet on LabRecord) |
| 10 | api/controllers/internal_sessions_controller.py |
MODIFY | β | Added /instantiation-progress, /allocate-lab-record-ports, /bind-lab, /expire endpoints. Note: actual controller is internal_sessions_controller.py (not internal/lablet_session_internal_controller.py) |
Deviations from plan:
close_run()on LabRecord deferred β not required for minimum viable expiry (unbind + release capacity suffices)- Command file path:
commands/lab/notcommands/lab_record/β follows existing convention - Controller path:
api/controllers/internal_sessions_controller.pyβ existing controller, not a new file - Added
LabletSessionLabBoundDomainEvent(not in original plan) β needed bybind_lab()aggregate method /sync-tagsendpoint deferred to Phase 3 (lablet-controller side; CPA doesn't need a dedicated endpoint for it)
Phase 2: Core Read Model Enrichment (lcm-core) β COMPLETE¶
Priority: HIGH | Effort: Small | Risk: LOW Completed: 2026-03-02 | Tests: 103 passed, 0 failures | Lint: All checks passed
| # | File | Action | Status | Notes |
|---|---|---|---|---|
| 1 | lcm_core/domain/entities/read_models/lablet_session_read_model.py |
MODIFY | β | Added instantiation_progress: dict | None field + from_dict() deserialization. timeslot_start/timeslot_end already present from ADR-020 |
| 2 | lcm_core/domain/entities/read_models/cml_worker_read_model.py |
MODIFY | β | Added allocated_port_count, available_port_count, port_utilization_pct fields + from_dict() deserialization |
| 3 | lcm_core/integration/clients/control_plane_client.py |
MODIFY | β | Added update_instantiation_progress(), allocate_lab_record_ports(), bind_lab_to_session(), expire_session() methods |
Deviations from plan:
timeslot_start/timeslot_endalready present onLabletSessionReadModel(added in ADR-020) β onlyinstantiation_progresswas newsync_lab_tags()client method deferred β lablet-controller calls CML PATCH API directly (no CPA passthrough needed). Added to Phase 3 scope instead
Phase 3: Lablet-Controller Pipeline Refactor (lablet-controller) β COMPLETE¶
Priority: HIGH | Effort: Large | Risk: MEDIUM Completed: 2026-03-06 | Tests: 231 passed, 0 failures | Lint: All checks passed
| # | File | Action | Status | Notes |
|---|---|---|---|---|
| 1 | application/hosted_services/lablet_reconciler.py |
MODIFY | β | Added early timeslot expiry check in reconcile(). Replaced monolithic _handle_instantiating() with DAG-based pipeline executor (Option A: inline methods). Added _handle_expired() method. Added 4 static pipeline helpers: _build_default_progress(), _next_executable_step(), _is_pipeline_complete(), _get_step_result_data() |
| 2 | (9 step methods within reconciler) | ADD | β | _step_content_sync(), _step_variables(), _step_lab_resolve(), _step_ports_alloc(), _step_tags_sync(), _step_lab_binding(), _step_lab_start(), _step_lds_provision(), _step_mark_ready() β all return plain dicts (not CPA VOs) |
| 3 | integration/services/cml_labs_spi.py |
MODIFY | β | Added patch_node_tags(host, lab_id, node_id, tags, username, password) β calls PATCH /api/v0/labs/{lab_id}/nodes/{node_id} with {"tags": tags}. Used by _step_tags_sync() |
| 4 | tests/test_instantiation_pipeline.py |
CREATE | β | 53 tests: pipeline helpers (DAG resolution, default progress, completion check), step methods (content_sync, variables, ports_alloc, tags_sync, lab_binding, lds_provision, mark_ready), pipeline executor (bootstrap, dispatch, error handling), timeslot expiry |
| 5 | tests/test_lablet_reconciler_g5.py |
MODIFY | β | Updated 11 tests for pipeline-based flow. Fixed get_nodes β get_lab_nodes mock references. Added _progress_with_lab_resolve() helper. Added instantiation_progress to make_instance() |
| 6 | tests/test_phase9_lab_discovery.py |
MODIFY | β | Updated 4 tests in TestHandleInstantiatingWithResolution to test pipeline steps directly. Added _content_sync_service, _worker_cache, _resource_observer to make_reconciler() |
Deviations from plan:
- Step methods return plain dicts (not
StepResultVOs from CPA domain) β lablet-controller does not import CPA domain classes; pipeline progress on the read model is a rawdict[str, Any] - Pipeline dispatch uses
getattr(self, f"_step_{step_name}")naming convention instead of explicit dict mapping β cleaner, auto-discovers new steps _provision_lds_session()kept for backward compatibility β_step_lds_provision()delegates to it- Fixed pre-existing bug:
get_nodes()βget_lab_nodes()in_provision_lds_session() _step_tags_synctreats tag sync failures as non-fatal (AD-TAGS-001) β logs warning and marks step completed- Added
import refor node label sanitization in tag sync
Architecture decisions:
- AD-TAGS-001: Tags sync failures are non-fatal β lab can function without tags (they're for port documentation/external-interface resolution)
- Option A confirmed: Pipeline steps as inline methods on LabletReconciler β avoids premature abstraction while keeping all instantiation logic colocated
Phase 4: Scheduler Simplification (resource-scheduler) β COMPLETE¶
Priority: MEDIUM | Effort: Small | Risk: LOW Completed: 2026-03-02 | Tests: 78 passed (resource-scheduler), 0 new failures | Lint: All checks passed
| # | File | Action | Status | Notes |
|---|---|---|---|---|
| 1 | application/hosted_services/scheduler_hosted_service.py |
MODIFY | β | Removed static port allocation block from _handle_assign() (lines 420β427). Replaced with allocated_ports: dict[str, int] = {}. Updated docstring and inline comments. Port allocation deferred to lablet-controller pipeline (ports_alloc step). |
| 2 | application/services/placement_engine.py |
VERIFY | β | _check_port_availability() confirmed count-only (checks len(port_entries) vs available). No change needed. |
| 3 | tests/unit/.../test_scheduler_hosted_service_phase2.py |
MODIFY | β | Updated test_reconcile_includes_ports_in_schedule to assert allocated_ports == {} instead of checking for specific port names. |
Architecture decisions:
- AD-P4-001: Defer port allocation from scheduler to lablet-controller pipeline β scheduler passes
{}, real allocation happens duringports_allocstep - Pre-existing
test_scheduler_service.pyfailures (8 tests) confirmed unrelated β legacy test file using old API method names (schedule_instance,get_pending_instances)
Phase 5: Lab Discovery Port Registration (lablet-controller) β COMPLETE¶
Priority: MEDIUM | Effort: Medium | Risk: LOW Completed: 2026-03-02 | Tests: 255 passed (lablet-controller), 0 new failures | Lint: All checks passed
| # | File | Action | Status | Notes |
|---|---|---|---|---|
| 1 | application/hosted_services/lab_discovery_service.py |
MODIFY | β | Added _register_ports_and_sync_tags() called after discover_lab_records() for BOOTED/STARTED labs. Finds LabRecord via CPA get_lab_records_for_worker(), calls allocate_lab_record_ports() (idempotent), then syncs CML node tags via _sync_tags_if_missing(). Added PortRegistrationResult class, port/tag stat counters, get_stats() update. |
| 2 | application/hosted_services/lab_discovery_service.py |
ADD method | β | _sync_tags_if_missing(host, lab_id, allocated_ports) β builds expected tags per node from port name convention ({safe_label}_{protocol}), compares with CML node tags, merges and PATCHes missing tags. AD-TAGS-001 non-fatal. |
| 3 | tests/test_phase9_lab_discovery.py |
MODIFY | β | Added 21 tests in 4 new test classes: TestPortRegistrationResult (2), TestRegisterPortsAndSyncTags (10), TestSyncTagsIfMissing (8), TestDiscoveryPortRegistrationIntegration (3). Covers: BOOTED/STARTED filtering, idempotent allocation, tag merge, non-fatal errors, stats tracking. |
Architecture decisions:
- AD-P5-001: Port registration runs after
discover_lab_records()CPA call, as a non-blocking post-processing step. Discovery results are unaffected by port registration failures β separation of concerns between discovery sync and port management. - AD-P5-002: Tag sync uses merge semantics β existing non-port tags are preserved, missing port tags are added. Tags are sorted for deterministic ordering.
- Labs without
based_on_definition_idare skipped (unmanaged/ad-hoc labs don't have port templates).
Phase 6: Worker Read Model & Frontend (control-plane-api + worker-controller) β COMPLETE¶
Priority: LOW | Effort: Medium | Risk: LOW Completed: 2026-03-02 | Tests: 822 passed, 0 new failures | Lint: All checks passed
| # | File | Action | Status | Notes |
|---|---|---|---|---|
| 1 | application/dtos/cml_worker_dto.py |
MODIFY | β | Added allocated_port_count, available_port_count, port_utilization_pct fields |
| 2 | application/mappers/cml_worker_mapper.py |
MODIFY | β | Added port usage defaults (0) β enriched by query handler |
| 3 | application/queries/get_cml_worker_by_id_query.py |
MODIFY | β | Injected PortAllocationService, enriches DTO with get_port_usage_stats() (graceful fallback to 0 on error) |
| 4 | application/dtos/lablet_session_dto.py |
MODIFY | β | Added instantiation_progress: dict \| None field to LabletSessionDto + mapping from state.instantiation_progress |
| 5 | ui/src/scripts/components/modals/SessionDetailsModal.js |
MODIFY | β | Replaced Pipeline tab "Coming soonβ¦" with full DAG-ordered step progress visualization. Shows: step dot indicators (β /π/β/β/β³), human labels, status badges, retry counts, error details, result data summary, progress bar, pipeline timing. SSE-reactive (invalidates cache on refresh). |
| 6 | tests/application/test_lablet_session_queries.py |
MODIFY | β | Added instantiation_progress = None to mock state |
| 7 | tests/application/test_lablet_session_dtos_and_sse.py |
MODIFY | β | Added instantiation_progress = None to mock state |
Architecture decisions:
- AD-P6-001: Port usage enrichment is done in the query handler (not the mapper) because
PortAllocationServiceis async and requires etcd access β the mapper is a pure synchronous function. Port stats default to 0 and are enriched post-mapping. - AD-P6-002: Pipeline tab is SSE-reactive β cache is invalidated on every
refreshCurrentTab()call so step progress updates in real-time during instantiation.
Phase 7: Testing (control-plane-api) β COMPLETE¶
Priority: MEDIUM | Effort: Medium | Risk: LOW Completed: 2026-03-02 | Tests: 773 passed (1 pre-existing rename-integrity failure), 0 new failures | Lint: 0 new lint errors
| # | File | Action | Status | Notes |
|---|---|---|---|---|
| 1 | tests/domain/test_instantiation_progress.py |
CREATE (CPA) | β | ~50 tests across 8 test classes: TestStepResult (5), TestBuildDefault (8), TestDAGResolution (10), TestStepTransitions (9), TestPipelineCompletion (11), TestGetStep (2), TestProgressSerialization (3), TestFullPipelineWalkthrough (2). Covers DAG resolution, skip semantics, serialization round-trip, full pipeline walkthrough |
| 2 | tests/application/test_allocate_lab_record_ports_command.py |
CREATE (CPA) | β | 9 tests: nominal allocation, idempotency (already allocated), lab record not found, no definition id, definition not found, no port template, empty port template, dict port template coercion, allocation service failure |
| 3 | tests/application/test_bind_lab_to_session_command.py |
CREATE (CPA) | β | 7 tests: nominal binding with port denormalization, binding without ports, idempotency (already bound), lab record not found, session not found, already bound to different session, conflicting binding |
| 4 | tests/application/test_expire_lablet_session_command.py |
CREATE (CPA) | β | 10 tests: nominal expiry (unbind + capacity release), allocated_ports unchanged invariant, idempotency (already expired), session not found, invalid state transition (409), no lab record bound, no worker (skip capacity release), capacity release failure (still ok), capacity release exception (still ok), custom reason propagation |
| 5 | tests/test_instantiation_pipeline.py |
(lablet-controller) | β | 53 tests β completed in Phase 3 |
Key testing insight (gotcha):
InstantiationProgress.next_executable_step()treatsskippedas satisfied β when testing that a failed step blocks dependents, usebuild_default(has_port_template=True, has_content_sync=True, has_lds=True)so intermediate steps are pending (not pre-skipped), otherwisemark_readyremains executable despite upstream failure.
Deviations from plan:
- Test file paths use flat
tests/domain/andtests/application/directories (nottests/test_*.pyat root) - Expire tests cover 10 scenarios (not 4 as originally estimated) β added critical invariant tests for allocated_ports unchanged, capacity release edge cases, and custom reason propagation
10. Migration & Backward Compatibility¶
10.1 Existing Sessions¶
Sessions created before this change will have instantiation_progress = None. The pipeline executor handles this gracefully:
def _build_default_progress(self, instance: LabletSessionReadModel) -> InstantiationProgress:
"""Build default progress for sessions without pipeline tracking."""
# For sessions already past INSTANTIATING, mark all steps as completed
if instance.status in ("READY", "RUNNING", ...):
return InstantiationProgress(steps=[...all completed...])
# For sessions currently INSTANTIATING, start fresh
return InstantiationProgress(steps=[...all pending...])
10.2 Scheduler Port Allocation¶
The scheduler currently passes static port values. After Phase 4, it passes {}. The CPA ScheduleLabletSessionCommand already handles empty allocated_ports β the session is created with allocated_ports=None and real ports are allocated during the ports_alloc pipeline step.
10.3 LabRecord Field Addition¶
Adding allocated_ports to LabRecord (the LabRecordState dataclass) is backward-compatible β the field defaults to None. Existing lab records in MongoDB will deserialize with allocated_ports=None. Note: LabRunRecord intentionally has NO port fields β ports are a topology-level concern on the LabRecord, not a runtime concern.
10.4 CML SPI Backward Compatibility¶
The new patch_node_tags() method is additive β no existing SPI methods are modified. Workers running older CML versions that don't support PATCH /api/v0/labs/{lab_id}/nodes/{node_id} will fail the tags_sync step gracefully. The pipeline step should treat tag sync failures as non-fatal warnings (log and continue) since the lab can still function without tags β tags are primarily for port documentation and external-interface resolution.
10.5 API Versioning¶
New internal endpoints are additive (no breaking changes). The update_instantiation_progress, allocate_lab_record_ports, sync_lab_tags, bind_lab_to_session, and expire_session endpoints are called only by the updated lablet-controller. Old lablet-controllers (pre-pipeline) continue using the monolithic flow until upgraded.
10.6 Rollback Plan¶
If the pipeline refactor causes issues:
- Revert lablet-controller to monolithic
_handle_instantiating() - CPA changes are backward-compatible (new fields default to None)
- Scheduler changes are backward-compatible (empty ports were already handled)
Appendix A: Sequence Diagram β Full Pipeline Flow¶
resource-scheduler control-plane-api lablet-controller CML Labs SPI LDS SPI
β β β β β
β schedule_session(ports={})β β β β
βββββββββββββββββββββββββββ>β β β β
β β PENDINGβSCHEDULED β β β
β βββββ etcd publish ββββββββ>β β β
β β β reconcile(SCHEDULED) β β
β β β β start_instantiation()β β
β β SCHEDULEDβINSTANTIATING β β β
β β<βββββββββββββββββββββββββββ β β
β βββββ etcd publish ββββββββ>β β β
β β β reconcile(INSTANTIATING) β
β β β β β
β β β ββ DAG roots (parallel)ββ β
β β β β β β
β β β β Step 1: content_sync β β
β β β β (verify definition) β β
β β β β β update_progress() β β
β β<βββββββββββββββββββββββββββ β β β
β β β β β β
β β β β Step 2: variables β β
β β β β (skippedβplaceholder) β β
β β β βββββββββββββββββββββββββ β
β β β β β
β β β Step 3: lab_resolve (requires: content_sync)
β β ββββ resolve/import ββββ>β β
β β β<βββββββββββββββββββββββ β
β β β β update_progress() β β
β β<βββββββββββββββββββββββββββ β β
β β β β β
β β β Step 4: ports_alloc (requires: lab_resolve)
β β β β allocate_lab_record_ports() β
β β PortAllocationService β β β
β β (etcd atomic alloc, β β β
β β keyed by lab_record_id) β β β
β β LabRecord.allocated_ports β β β
β β<βββββββββββββββββββββββββββ β β
β β β β β
β β β Step 5: tags_sync (requires: ports_alloc)
β β ββ PATCH node tags βββββ>β β
β β β (per node: protocol:port) β
β β β<βββββββββββββββββββββββ β
β β β β update_progress() β β
β β<βββββββββββββββββββββββββββ β β
β β β β β
β β β Step 6: lab_binding (requires: tags_sync)
β β β β bind_lab_to_session()β β
β β LabRunRecord created β β β
β β (NO ports on run) β β β
β β Session gets denormalized β β β
β β ports from LabRecord β β β
β β<βββββββββββββββββββββββββββ β β
β β β β β
β β β Step 7: lab_start (requires: lab_binding)
β β ββββ start_lab βββββββββ>β β
β β β (poll until BOOTED) β β
β β β<βββββββββββββββββββββββ β
β β β β update_progress() β β
β β<βββββββββββββββββββββββββββ β β
β β β β β
β β β Step 8: lds_provision (requires: lab_start)
β β βββββ create_session βββββββββββββββββββ>β
β β βββββ set_devices ββββββββββββββββββββββ>β
β β β<βββββββββββββββββββββββββββββββββββββββ
β β β β create_user_session() β β
β β<βββββββββββββββββββββββββββ β β
β β β β β
β β β Step 9: mark_ready (requires: lds_provision)
β β β β mark_session_ready() β β
β β INSTANTIATINGβREADY β β β
β β<βββββββββββββββββββββββββββ β β
β β β β β
Β· Β· ββ timeslot expires ββ Β· Β·
β β β β β
β β β Expiry: reconcile() β β
β β β β expire_session() β β
β β close LabRunRecord β β β
β β unbind LabRecord β β β
β β (ports UNCHANGED on β β β
β β LabRecord β topology) β β β
β β release capacity β β β
β β READYβEXPIRED β β β
β β<βββββββββββββββββββββββββββ β β
β β β stop CML lab βββββββββ>β β
β β β delete LDS session βββββββββββββββββββ>β
Appendix B: Decision Record¶
ADR-031: Checkpoint-Based Instantiation Pipeline
- Decision: Replace monolithic
_handle_instantiating()with DAG-based checkpoint pipeline where progress is persisted per-step to the LabletSession aggregate. Steps declare their prerequisites;next_executable_step()resolves execution order. Pipeline has 9 steps includingtags_sync. - Rationale: Current waterfall approach has no recovery, no visibility, no per-step retry capability. Combined with drain-loop fix, checkpoints enable reliable multi-step advancement.
- Alternatives Rejected:
- Workflow engine (Temporal/Prefect): Adds operational complexity, overkill for 9 steps
- Saga pattern: Steps don't have compensating actions (can't "un-import" a lab meaningfully)
- Linear ordering: Steps have true dependencies (e.g., tags_sync needs ports_alloc result), so a DAG is the natural representation
- Related: AD-DRAIN-001 (prerequisite), ADR-020 (session consolidation), ADR-029 (port template), ADR-030 (resource observation)
AD-PORT-001: Port Allocation as LabRecord Topology Concern
- Decision: Ports are allocated to the LabRecord (
LabRecordState.allocated_ports: dict[str, int] | None), NOT to a LabRunRecord or LabletSession. The LabletSession gets a denormalized copy via thelab_bindingstep. Port allocation uses etcd, keyed bylab_record_id. Ports persist across start/stop/wipe cycles and are released only when the LabRecord is deleted. - Rationale: Ports are a topology-level concern β they represent how a lab's nodes are reachable on a worker's port range. A lab's topology (nodes, edges, tags) persists across start/stop/wipe operations. If ports were on LabRunRecord, they would be lost on each stop/wipe cycle, requiring re-allocation and re-tagging. By owning ports at the LabRecord level, we ensure port stability for the lab's entire lifetime on a worker.
- Scheduler impact: Scheduler only validates port count availability (
validate_port_availability()), passesallocated_ports={}. Real allocation is a provisioning concern, not a scheduling concern. - Lab discovery: Discovered labs register their existing ports on the LabRecord via
AllocateLabRecordPortsCommandto prevent conflicts.
AD-BIND-001: LabRunRecord Without Port Fields
- Decision: The
lab_bindingpipeline step creates aLabRunRecordWITHOUTallocated_ports.LabRunRecordis a pure runtime concern (run_id, started_at, stopped_at, duration, stop_reason). Port information comes from the parentLabRecord.allocated_ports. The LabRecord tracks the active binding (active_lablet_session_id,active_binding_id). - Rationale: Ports don't change between runs β they are topology-level (see AD-PORT-001). Duplicating ports on every LabRunRecord would create data inconsistency risks and storage waste. A LabRunRecord captures the complete context of a lab run β who started it, when, for how long, for which session. Port lookup is via
LabRecord.allocated_ports(single source of truth).
AD-TAGS-001: CML Node Tags Synced with Allocated Ports
- Decision: After port allocation (
ports_allocstep), a newtags_syncstep writes allocated ports as CML node tags usingPATCH /api/v0/labs/{lab_id}/nodes/{node_id}withtags: list[str]. Tag format:protocol:port_number(e.g.,serial:3001,vnc:3002). Tags persist across start/stop/wipe because they are topology-level. Thetags_syncstep is non-fatal β if CML doesn't support PATCH or the call fails, the step logs a warning and proceeds. - Rationale: CML node tags are the canonical mechanism for documenting which ports map to which protocols on each node. They are consumed by
PortTemplate.from_cml_nodes()to buildExternalInterfacevalue objects. Without tag sync, allocated ports would be invisible to topology inspection, and lab discovery would not be able to reconstruct port mappings for already-running labs. - Tag lifecycle: Tags are written once (after
ports_alloc) and persist for the lab's lifetime. They are NOT modified by stopping, wiping, or restarting the lab. Tags are removed only if the lab is deleted from CML (which also triggers LabRecord deletion and port release). - CML SPI extension: Requires
patch_node_tags(worker_id, lab_id, node_id, tags: list[str])method added to the CML SPI client.
AD-TIMESLOT-001: Timeslot-Centric Session Lifecycle
- Decision: A LabletSession maps 1:1 to a timeslot (
timeslot_start,timeslot_end). Expiry triggers downstream cleanup: LabRunRecord closure, LabRecord unbinding, capacity release, LDS session teardown. Ports are NOT released at session expiry β they belong to the LabRecord and persist for lab reuse. - Rationale: Keeping the lifecycle simple (forward-only state machine with extend support) avoids complexity. The timeslot is the authoritative lifecycle boundary β when it expires, runtime resources (capacity, LDS session) are cleaned up, but topology-level resources (ports, tags) remain on the LabRecord for future sessions.