ADR-034: Pipeline Executor & Lifecycle Phase Handlers¶
| Attribute | Value |
|---|---|
| Status | Proposed |
| Date | 2026-03-02 |
| Deciders | Architecture Team |
| Supersedes | Partially supersedes ADR-031 §execution-model |
| Related ADRs | ADR-011 (APScheduler Removal), ADR-031 (Checkpoint Pipeline), ADR-030 (Resource Observation), ADR-020 (Session Entity Model), ADR-019 (LabRecord Aggregate) |
| Related Decisions | AD-PIPELINE-002, AD-PIPELINE-003, AD-PIPELINE-004, AD-PIPELINE-007, AD-PIPELINE-008, AD-PIPELINE-009 |
Errata (Sprint C):
fail_session()referenced in §3 pseudocode is replaced byterminate_session()per AD-PIPELINE-008 (internal terminate endpoint)- Backward compatibility fallback (definitions without pipelines) is removed per AD-PIPELINE-009 — all definitions MUST have pipelines
- Pipeline failures do NOT auto-trigger teardown per AD-PIPELINE-007 — reconciler retries with resumability, then terminates after max_retries exhausted
- See
docs/implementation/ADR-034-sprint-c-prompt.mdfor the authoritative implementation spec
Context¶
Problem 1: Pipeline Execution Stall¶
ADR-031 introduced a checkpoint-based instantiation pipeline with 9 DAG-ordered steps.
The design executes one step per reconciliation cycle, returning ReconciliationResult.requeue() after each step. However, LabletSessionInstantiationProgressUpdatedDomainEvent has no etcd projector, so progress updates don't trigger watch events. The next reconcile waits for the 30-second polling interval. With 9 steps, the pipeline takes ~4.5 minutes instead of ~30 seconds.
Problem 2: Single Pipeline, Hardcoded¶
ADR-031 only covers the instantiate pipeline. The system needs at least 4 more pipelines across the LabletSession lifecycle:
| Pipeline | Trigger Status | Purpose |
|---|---|---|
instantiate |
INSTANTIATING | Lab import, ports, LDS, readiness |
collect_evidence |
COLLECTING | Capture configs, screenshots, pcaps |
compute_grading |
GRADING | Evaluate evidence against rubric |
teardown |
STOPPING | Stop lab, deregister LDS, wipe, archive |
(future) warmup |
WARMING | Pre-instantiate warm pool instances |
Each pipeline shares the same DAG execution mechanics but differs in steps.
Problem 3: Reconciler Monolith¶
The LabletReconciler is 2009 lines with a monolithic reconcile() that routes by status to _handle_{status}() methods. Each handler runs inline within the reconcile call and returns a ReconciliationResult. This one-reconcile-per-action model doesn't fit multi-step pipelines.
Problem 4: LabletDefinition Aggregate Bloat¶
LabletDefinitionState carries 46 fields spanning 5 distinct responsibilities. Pipeline definitions, when added, would be a 6th concern on an already overloaded aggregate. The upstream_sync_status field alone is an unbounded JSON blob containing 5 nested service results with their own status/version/logs.
Problem 5: No Background Job Framework¶
ADR-011 removed APScheduler in January 2026, replacing all background jobs with reconciliation loops. This was correct for simple poll-and-act patterns but doesn't address long-running, multi-step pipelines that need to self-drive within a lifecycle phase.
APScheduler 4.x was evaluated (async-native, MongoDB data store, Redis event broker, distributed scheduling) but rejected because:
- It's pre-release — README explicitly warns "do NOT use in production"
- It solves scheduling (when to run) not orchestration (DAG execution)
- We'd still need PipelineExecutor on top — APScheduler adds a layer without eliminating one
- It introduces a second scheduling paradigm alongside reconcilers (violates ADR-011's principle)
- Our existing stack (MongoDB + etcd + reconcilers) already provides persistence, crash recovery, and leader election
Decision¶
1. PipelineExecutor — DAG-Driven Inner Loop¶
Replace the one-step-per-reconcile pattern with a PipelineExecutor utility that runs all steps sequentially within a single invocation. The reconciler delegates to the executor; the executor self-drives through the DAG.
reconcile(session_id)
→ status routing: _handle_{status}()
→ handler ensures LifecyclePhaseHandler is active for this session
→ handler runs PipelineExecutor if a pipeline is defined
→ PipelineExecutor runs all steps in inner loop
→ progress persisted after each step (crash recovery)
→ on completion: status transition + etcd event
PipelineExecutor Responsibilities¶
| Concern | Implementation |
|---|---|
| DAG resolution | Topological sort of steps by needs + skip evaluation |
| Skip evaluation | simpleeval library — safe Python expression evaluation, zero dependencies |
| Step dispatch | Handler lookup via _step_{handler}() naming convention |
| Progress persistence | After each step, persist to CPA via update_pipeline_progress() |
| Retry | Per-step retry.max_attempts and retry.delay_seconds |
| Timeout | Per-step timeout_seconds via asyncio.wait_for() |
| Optional steps | optional: true — failure doesn't block downstream steps |
| Context injection | $SESSION, $DEFINITION, $WORKER, $STEPS variables |
| Output collection | outputs section maps dot-path expressions to step result_data |
Expression Evaluation with simpleeval¶
simpleeval is a zero-dependency, single-file Python library (~30KB) that evaluates Python-like expressions via AST walking — no eval() or exec(). It provides:
- Safe sandboxing (blocks
_-prefixed attributes, dangerous builtins) - Python-native syntax matching YAML author expectations
- Configurable name handlers for
$SESSION,$DEFINITION, etc.
from simpleeval import SimpleEval
evaluator = SimpleEval()
evaluator.names = {
"SESSION": session_read_model,
"DEFINITION": definition_state,
"WORKER": worker_read_model,
"STEPS": completed_steps_data,
}
# skip_when expressions from YAML
evaluator.eval("not DEFINITION.form_qualified_name") # → True if no FQN
evaluator.eval("not DEFINITION.port_template") # → True if no ports
evaluator.eval("not SESSION.user_session_id") # → True if no LDS session
Note: $ prefix in YAML is convention for documentation clarity. The evaluator strips $ before lookup.
PipelineResult¶
@dataclass
class PipelineResult:
"""Result of a complete pipeline execution."""
pipeline_name: str
status: str # "completed" | "failed" | "partial"
steps_completed: int
steps_failed: int
steps_skipped: int
duration_seconds: float
outputs: dict[str, Any] # Resolved output expressions
error: str | None = None # First fatal error
2. LifecyclePhaseHandler — Per-Session Background Tasks¶
Each lifecycle phase that involves multi-step work gets a LifecyclePhaseHandler — a managed asyncio.Task that self-drives the pipeline for a specific session.
class LifecyclePhaseHandler:
"""Manages pipeline execution for one session in one lifecycle phase."""
def __init__(
self,
session_id: str,
pipeline_name: str,
pipeline_def: dict,
context: PipelineContext,
executor: PipelineExecutor,
):
self.session_id = session_id
self.pipeline_name = pipeline_name
self._pipeline_def = pipeline_def
self._context = context
self._executor = executor
self._task: asyncio.Task | None = None
async def start(self) -> None:
"""Start the handler as a background task."""
if self._task and not self._task.done():
return # idempotent — already running
self._task = asyncio.create_task(
self._run(),
name=f"pipeline:{self.pipeline_name}:{self.session_id}",
)
async def _run(self) -> None:
"""Execute the pipeline, handle completion/failure."""
try:
result = await self._executor.execute(
pipeline_def=self._pipeline_def,
context=self._context,
)
await self._on_complete(result)
except asyncio.CancelledError:
logger.info(f"Pipeline {self.pipeline_name} cancelled for {self.session_id}")
raise
except Exception as e:
logger.error(f"Pipeline {self.pipeline_name} crashed for {self.session_id}: {e}")
await self._on_error(e)
async def _on_complete(self, result: PipelineResult) -> None:
"""Handle successful pipeline completion — trigger status transition."""
# Implementation varies by pipeline type
pass
async def stop(self) -> None:
"""Cancel the handler gracefully."""
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
Reconciler Integration¶
The reconciler manages handlers via a dict[str, LifecyclePhaseHandler]:
# In LabletReconciler
_active_handlers: dict[str, LifecyclePhaseHandler] = {}
async def _handle_instantiating(self, instance) -> ReconciliationResult:
handler_key = f"{instance.id}:instantiate"
if handler_key in self._active_handlers:
handler = self._active_handlers[handler_key]
if handler.is_running:
return ReconciliationResult.success() # handler is self-driving
# Handler finished — clean up
del self._active_handlers[handler_key]
return ReconciliationResult.success()
# No pipeline defined → error
pipeline_def = self._get_pipeline_def(instance, "instantiate")
if not pipeline_def:
await self._api.fail_session(
instance.id,
reason="No 'instantiate' pipeline defined. Configure pipelines on the LabletDefinition.",
)
return ReconciliationResult.failed("No pipeline defined")
# Start new handler
handler = LifecyclePhaseHandler(
session_id=instance.id,
pipeline_name="instantiate",
pipeline_def=pipeline_def,
context=self._build_context(instance),
executor=self._pipeline_executor,
)
self._active_handlers[handler_key] = handler
await handler.start()
return ReconciliationResult.success()
Leader Step-Down Cleanup¶
When the reconciler loses leadership, all active handlers are cancelled:
async def _step_down(self) -> None:
# Cancel all active pipeline handlers
for key, handler in list(self._active_handlers.items()):
await handler.stop()
self._active_handlers.clear()
# ... existing child service teardown ...
await super()._step_down()
3. Concurrency Model — Watch Events, Per-Session Serialization, and Self-Driving¶
This section addresses three architectural questions:
- How does a running
LifecyclePhaseHandlercoexist with incoming etcd watch events? - Should reconciliation be serialized per aggregate root?
- With polling disabled, how does the system remain both reactive and proactive?
Current Reconciler Concurrency (Pre-ADR-034)¶
The WatchTriggeredHostedService base class provides two call paths into reconcile():
| Path | Trigger | Scope | Concurrency |
|---|---|---|---|
| Polling | Timer every reconcile_interval |
ALL non-terminal resources | Up to max_concurrent (10) via asyncio.Semaphore |
| Watch | etcd key change → debounce (0.5s) → drain-loop | Single resource by ID | Sequential — simple for loop |
| Startup sweep | Once on leader election | ALL non-terminal resources | Up to max_concurrent via polling path |
Existing race condition: The polling path checks ResourceState.in_progress before launching a reconcile task, but the watch path does not — it calls _reconcile_single() directly. A watch event and a polling cycle can reconcile the same session concurrently with no mutual exclusion.
For simple reconcile operations (check state, do one action, return), this race is mostly harmless — idempotent operations tolerate double-execution. But for LifecyclePhaseHandler management, the race can create duplicate handlers.
Per-Session Serialization via asyncio.Lock¶
Decision: Add a per-session asyncio.Lock to serialize reconciliation for the same aggregate root.
class LabletReconciler(WatchTriggeredHostedService[LabletSessionReadModel]):
_active_handlers: dict[str, LifecyclePhaseHandler] = {}
_session_locks: dict[str, asyncio.Lock] = {}
def _get_session_lock(self, session_id: str) -> asyncio.Lock:
"""Get or create a lock for a specific session (lazy initialization)."""
if session_id not in self._session_locks:
self._session_locks[session_id] = asyncio.Lock()
return self._session_locks[session_id]
async def reconcile(self, instance: LabletSessionReadModel) -> ReconciliationResult:
"""Reconcile a single session — serialized per session_id."""
lock = self._get_session_lock(instance.id)
async with lock:
return await self._reconcile_inner(instance)
async def _reconcile_inner(self, instance: LabletSessionReadModel) -> ReconciliationResult:
"""Actual reconciliation logic — guaranteed single-threaded per session."""
match instance.status:
case "INSTANTIATING":
return await self._handle_instantiating(instance)
case "COLLECTING":
return await self._handle_collecting(instance)
# ... other status handlers ...
Why per-session and not a global lock?
| Approach | Throughput | Safety | Complexity |
|---|---|---|---|
| Global lock (one at a time) | ❌ Sequential — 50 sessions = 50× latency | ✅ Simple | Low |
| No lock (current) | ✅ Concurrent | ❌ Handler duplication risk | Low |
| Per-session lock ✅ | ✅ Concurrent across sessions, serial per session | ✅ No duplication | Medium |
The lock is cheap — asyncio.Lock is non-blocking for non-contended cases (each session_id is a separate lock). Two reconciliations for different sessions proceed concurrently. Two for the same session queue correctly.
Lock lifecycle: Locks are lazily created and cleaned up during leader step-down:
async def _step_down(self) -> None:
for key, handler in list(self._active_handlers.items()):
await handler.stop()
self._active_handlers.clear()
self._session_locks.clear() # GC all locks on leadership loss
await super()._step_down()
Watch + Handler Interaction Sequence¶
When a LifecyclePhaseHandler is actively running a pipeline and a new watch event arrives for the same session, the sequence is:
Timeline (single session: session-123)
────────────────────────────────────────────────────
t0: Watch event → status=INSTANTIATING
reconcile(session-123) → acquires lock
→ _handle_instantiating() → no handler exists → create + start handler
→ handler.start() → asyncio.Task begins executing pipeline steps
→ return ReconciliationResult.success()
→ releases lock
t1: Handler is running step 3 of 9 (self-driving inner loop)
Handler calls CPA: update_instantiation_progress(step=3, ...)
CPA emits event → etcd projector writes /lcm/sessions/session-123/progress
t2: Watch event arrives for session-123 (from t1's etcd write)
→ added to _pending_reconciles set
→ debounced_reconcile() fires
reconcile(session-123) → acquires lock
→ _handle_instantiating()
→ handler_key in _active_handlers? YES
→ handler.is_running? YES
→ return ReconciliationResult.success() ← NO-OP
→ releases lock
t3: Handler is running step 7 of 9 (unaware of t2)
(watch event was a no-op — handler is self-driving)
t4: Handler completes all 9 steps → _on_complete()
→ calls CPA: transition session INSTANTIATING → READY
→ CPA emits event → etcd writes /lcm/sessions/session-123/status=READY
→ handler task finishes naturally
t5: Watch event arrives for session-123 (from t4's status change)
reconcile(session-123) → acquires lock
→ status is now READY → _handle_ready()
→ handler_key "session-123:instantiate" in _active_handlers? YES but done
→ clean up: del _active_handlers[handler_key]
→ _handle_ready() logic (no pipeline needed — acknowledge readiness)
→ return ReconciliationResult.success()
→ releases lock
Key insight: The handler IS the proactive loop. Watch events that arrive while a handler is running are intentional no-ops — the reconciler confirms the handler is alive and returns. No work is duplicated.
Self-Driving Without Polling — Reactive + Proactive Architecture¶
With polling always disabled, the system operates on two complementary mechanisms:
| Mechanism | Scope | Purpose | Trigger |
|---|---|---|---|
| etcd watch (reactive) | Cross-step, cross-phase | Detect state transitions, launch/check handlers | External: CPA projector, admin action, another controller |
| Inner loop (proactive) | Within a single pipeline | Self-drive through all steps without external triggers | Internal: PipelineExecutor.execute() loop |
Reactive path — "Something changed, should I act?":
etcd key change → watch event → debounce → reconcile(session)
→ status routing → _handle_{status}()
→ check: handler running? → yes: no-op / no: start one
Proactive path — "I'm already acting, keep going":
LifecyclePhaseHandler._run()
→ PipelineExecutor.execute(pipeline_def, context)
→ for step in topological_order(steps):
→ evaluate skip_when
→ dispatch to _step_{handler}()
→ persist progress
→ collect outputs
→ return PipelineResult
→ _on_complete() → signal CPA → etcd write → triggers NEXT reactive cycle
The handler replaces what polling would do. In a polling model, each 30-second tick would re-enter reconcile() and advance one step. The handler instead runs a tight async for loop — it is the polling, just without the 30-second gaps.
Lifecycle phase transitions are the bridge: When a pipeline completes, the handler signals CPA (status transition), which writes to etcd, which triggers a watch event, which enters the reconciler for the next lifecycle phase. The chain is:
┌─── reactive ───┐ ┌── proactive ──┐
│ │ │ │
watch event ──► reconcile() ──► handler ──► pipeline ──► steps
│ │
└── inner loop ─┘
│
on_complete()
│
CPA status change
│
etcd projector write
│
┌─────▼─────┐
│ watch event │ ◄── cycle repeats
└────────────┘
Startup Recovery Without Polling¶
Question: If polling is disabled and the controller restarts, how are in-progress sessions recovered?
Answer: The startup sweep (existing mechanism) handles this. WatchTriggeredHostedService performs a one-time _reconcile_all() when it becomes leader. This lists all non-terminal sessions and reconciles each one. Sessions that were mid-pipeline will have their instantiation_progress checkpoint — the PipelineExecutor resumes from the last completed step.
# In WatchTriggeredHostedService._step_up()
async def _step_up(self) -> None:
"""Called when this instance becomes leader."""
await super()._step_up()
# ... start watch stream ...
# Startup sweep: reconcile all existing resources
await self._reconcile_all() # ← catches in-progress sessions
This means the system requires exactly two mechanisms:
- Startup sweep — catch everything that exists (crash recovery)
- Watch events — catch everything that changes (reactive)
No polling needed. The handler provides proactivity within a lifecycle phase.
Why Not a PipelineExecutor Loop That Mimics Polling?¶
A tempting alternative: have the PipelineExecutor run an infinite loop that periodically re-evaluates the session state, similar to a miniature polling reconciler:
# ❌ Anti-pattern: executor-as-poller
async def execute_loop(self):
while not self._cancelled:
session = await self._fetch_current_state()
result = await self._evaluate_and_run_next_step(session)
if result.is_terminal:
break
await asyncio.sleep(5) # mini-poll interval
This is wrong because:
- Unnecessary latency — The executor knows the DAG; it doesn't need to re-discover what to do next
- Redundant state fetches — Each iteration refetches session state that hasn't changed externally
- Violates separation of concerns — The executor should execute a pipeline, not poll for work
- Duplicates reconciler responsibility — The reconciler already handles "should I act?"; the executor handles "how do I act?"
The correct model is:
| Component | Responsibility | Loop? |
|---|---|---|
| Reconciler | "Should I start/check a handler?" | No loop — event-driven (watch + startup sweep) |
| LifecyclePhaseHandler | "Is the pipeline running? Start/stop it." | No loop — one-shot task |
| PipelineExecutor | "Run all steps in DAG order." | Yes — for step in dag (finite, not infinite) |
The executor's loop is a finite iteration over a DAG, not an infinite polling loop. It terminates when all steps are complete (or one fails fatally). The reconciler never polls — it reacts to etcd events. The handler is a one-shot asyncio.Task — it runs the executor once and exits.
4. Pipeline Definitions — Inline in LabletDefinition YAML¶
Pipelines are defined inline in the LabletDefinition seed YAML under an optional pipelines: key. This key maps to a new pipelines: dict | None field on LabletDefinitionState.
Schema¶
pipelines:
{pipeline_name}: # e.g., "instantiate", "collect_evidence"
description: str # Human-readable purpose
trigger: "on_status:{status}" # Which lifecycle status triggers this pipeline
steps:
- name: str # Unique step identifier
handler: str # Maps to _step_{handler}() method
description: str # Human-readable purpose
needs: [str] # DAG dependencies (step names)
skip_when: str # simpleeval expression → skip if truthy
optional: bool # If true, failure doesn't block downstream
timeout_seconds: int # asyncio.wait_for() timeout
retry: # Optional retry configuration
max_attempts: int
delay_seconds: int
outputs: # Dot-path expressions → pipeline results
{key}: "$STEPS.{step_name}.{field}"
Context Variables¶
| Variable | Type | Description |
|---|---|---|
$SESSION |
LabletSessionReadModel |
The session being reconciled |
$DEFINITION |
LabletDefinitionState |
The definition template (read-only) |
$WORKER |
Worker read model | The assigned worker |
$STEPS |
dict[str, dict] |
Completed step result_data (DAG outputs) |
No Pipeline = Error¶
When a session enters a status that requires a pipeline (INSTANTIATING, COLLECTING, GRADING, STOPPING) but no matching pipeline is defined in the LabletDefinition:
- The reconciler logs an error
- The session transitions to an error state with a clear message: "No '{pipeline_name}' pipeline defined. Configure pipelines on the LabletDefinition."
- The admin must add the pipeline to the definition and re-trigger
No hardcoded fallback pipeline. This forces explicit configuration.
5. Output Storage¶
Pipeline outputs are stored in two locations:
| Output Type | Storage Location | Access Path |
|---|---|---|
| Step metadata | LabletSession.instantiation_progress (MongoDB) |
Per-step status, result_data, timestamps |
| Pipeline outputs | LabRecord run history (appended as run record) |
Cumulative execution history per lab |
| File artifacts | S3/RustFS bucket | s3://{bucket_name}/{session_id}/{pipeline_name}/{artifact} |
| Score reports | ScoreReport aggregate (via CPA command) |
Created by compute_grading pipeline |
The LabRecord gains a pipeline_runs field — an append-only list of pipeline execution records:
@dataclass
class PipelineRunRecord:
pipeline_name: str # "instantiate", "collect_evidence", etc.
session_id: str # Which session triggered this run
started_at: datetime
completed_at: datetime | None
status: str # "completed" | "failed" | "cancelled"
steps: list[dict] # Per-step results
outputs: dict[str, Any] # Resolved output expressions
error: str | None
6. LabletDefinition Decomposition Plan¶
The LabletDefinitionState aggregate is overloaded with 46 fields across 5+ responsibilities. This ADR initiates a decomposition:
Phase 1: Add pipelines (This ADR — Immediate)¶
Add pipelines: dict | None to LabletDefinitionState. Optional field, parsed from seed YAML.
Phase 2: Extract ContentSyncRecord (Separate ADR — Next)¶
Move content sync orchestration results out of the aggregate:
| Fields to Extract | Current Location | New Location |
|---|---|---|
upstream_sync_status (unbounded blob) |
LabletDefinitionState |
ContentSyncRecord (child entity or separate aggregate) |
content_package_hash |
LabletDefinitionState |
ContentSyncRecord |
upstream_version |
LabletDefinitionState |
ContentSyncRecord |
upstream_date_published |
LabletDefinitionState |
ContentSyncRecord |
upstream_instance_name |
LabletDefinitionState |
ContentSyncRecord |
upstream_form_id |
LabletDefinitionState |
ContentSyncRecord |
grade_xml_path |
LabletDefinitionState |
ContentSyncRecord |
cml_yaml_path |
LabletDefinitionState |
ContentSyncRecord |
cml_yaml_content |
LabletDefinitionState |
ContentSyncRecord |
devices_json |
LabletDefinitionState |
ContentSyncRecord |
last_synced_at |
LabletDefinitionState |
ContentSyncRecord |
sync_status |
LabletDefinitionState |
ContentSyncRecord |
12 fields move to ContentSyncRecord, leaving the definition with ~34 fields and a clear FK reference:
# On LabletDefinitionState (after decomposition)
content_sync_record_id: str | None # FK to ContentSyncRecord
Phase 3: UX Decomposition (Frontend — Follows Phase 2)¶
The Definition Details modal (see screenshot) reorganizes into:
| Tab/Section | Source | Before | After |
|---|---|---|---|
| Template | LabletDefinitionState | Mixed with sync data | Clean: name, version, FQN, resources, ports |
| Content Sync | ContentSyncRecord | upstream_sync_status blob rendered inline |
Separate panel/tab with service-by-service status |
| Pipelines | LabletDefinitionState.pipelines |
Not shown | New tab showing pipeline DAGs with step configs |
| Lifecycle | LabletDefinitionState | Status + deprecation mixed | Status badge + deprecation section |
7. simpleeval Dependency¶
Add simpleeval to lablet-controller/pyproject.toml:
Zero transitive dependencies. ~30KB. Pure Python.
Implementation Plan¶
Phase 1: Foundation (Sprint N)¶
| # | Task | Component | Effort |
|---|---|---|---|
| 1.1 | Add pipelines: dict \| None to LabletDefinitionState |
CPA domain | S |
| 1.2 | Update LabletDefinitionCreatedDomainEvent to carry pipelines |
CPA domain | S |
| 1.3 | Update seeder to parse pipelines from YAML |
CPA infrastructure | S |
| 1.4 | Add pipelines to LabletDefinitionDto |
CPA application | S |
| 1.5 | Add pipelines to LabletDefinitionReadModel + from_dict() (definitions flow via HTTP API, not etcd) |
lcm_core | S |
| 1.6 | Two seed files with pipeline definitions (done ✅) | CPA data | — |
Phase 2: PipelineExecutor (Sprint N)¶
| # | Task | Component | Effort |
|---|---|---|---|
| 2.1 | Create PipelineExecutor class in lablet-controller |
lablet-controller | M |
| 2.2 | Add simpleeval dependency |
lablet-controller | S |
| 2.3 | Implement DAG resolution with topological sort | lablet-controller | M |
| 2.4 | Implement skip_when evaluation via simpleeval |
lablet-controller | S |
| 2.5 | Implement step dispatch, retry, timeout | lablet-controller | M |
| 2.6 | Implement PipelineContext with $SESSION, $DEFINITION, $WORKER, $STEPS |
lablet-controller | S |
| 2.7 | Unit tests for executor (DAG, skip, retry, timeout, context) | lablet-controller | M |
Phase 3: LifecyclePhaseHandler (Sprint N+1)¶
| # | Task | Component | Effort |
|---|---|---|---|
| 3.1 | Create LifecyclePhaseHandler base class |
lablet-controller | M |
| 3.2 | Add per-session asyncio.Lock to reconciler (_session_locks) |
lablet-controller | S |
| 3.3 | Implement _active_handlers management in reconciler |
lablet-controller | M |
| 3.4 | Refactor _handle_instantiating() to delegate to handler |
lablet-controller | L |
| 3.4 | Implement _handle_collecting() with handler delegation |
lablet-controller | M |
| 3.5 | Implement _handle_grading() with handler delegation |
lablet-controller | M |
| 3.6 | Refactor _handle_stopping() to use teardown pipeline |
lablet-controller | M |
| 3.7 | Leader step-down: cancel all active handlers | lablet-controller | S |
| 3.8 | Unit + integration tests | lablet-controller | L |
Phase 4: Output Storage & LabRecord Integration (Sprint N+1)¶
| # | Task | Component | Effort |
|---|---|---|---|
| 4.1 | Add PipelineRunRecord to LabRecord aggregate |
CPA domain | M |
| 4.2 | Create AppendPipelineRunCommand |
CPA application | S |
| 4.3 | Wire executor completion → CPA command | lablet-controller | S |
| 4.4 | S3 artifact upload for file outputs (evidence, screenshots) | lablet-controller | M |
Phase 5: LabletDefinition Decomposition (Sprint N+2)¶
| # | Task | Component | Effort |
|---|---|---|---|
| 5.1 | Design ContentSyncRecord aggregate/child entity (separate ADR) |
Architecture | M |
| 5.2 | Create ContentSyncRecord domain entity |
CPA domain | M |
| 5.3 | Migrate 12 fields from LabletDefinitionState | CPA domain | L |
| 5.4 | Update ContentSyncService to write to new entity | lablet-controller | M |
| 5.5 | Update read models and DTOs | CPA application | M |
| 5.6 | Database migration script | CPA infrastructure | M |
| 5.7 | Update API endpoints | CPA api | M |
Phase 6: UX Redesign (Sprint N+2)¶
| # | Task | Component | Effort |
|---|---|---|---|
| 6.1 | Redesign Definition Details modal with tabs | CPA UI | L |
| 6.2 | Add Pipelines tab with DAG visualization | CPA UI | L |
| 6.3 | Move Content Sync to separate panel/tab | CPA UI | M |
| 6.4 | Add pipeline execution status to Session detail | CPA UI | M |
| 6.5 | Real-time SSE events for pipeline step progress | CPA + lablet-controller | M |
Consequences¶
Positive¶
- Pipeline stall eliminated: Inner loop executes all steps in one go (~30s instead of ~4.5min)
- Extensible: New pipelines added via YAML definition, not code changes
- Observable: Per-step progress, timing, and status visible in real-time
- Crash-safe: Progress persisted after each step; resumed from last checkpoint on restart
- Clean separation: Definition = template, ContentSyncRecord = operational state, Pipeline = execution
- No new scheduling paradigm: Consistent with ADR-011 — reconcilers + asyncio.Tasks
- Polling-free: Watch events + startup sweep + self-driving handlers — no periodic timer needed
- Concurrency-safe: Per-session
asyncio.Lockprevents handler duplication from overlapping watch/poll paths - Admin-first: No magic defaults — explicit pipeline configuration required
Negative¶
- Breaking change: Existing definitions without
pipelineswill error when sessions run - Migration effort: Phase 5 decomposition touches many files across CPA
- simpleeval dependency: New library in lablet-controller (mitigated: zero-dep, single file)
- Handler lifecycle complexity: Managing asyncio.Tasks per session requires careful cancellation
Risks¶
| Risk | Mitigation |
|---|---|
| Handler memory leak (tasks not cleaned up) | Leader step-down cancels all; periodic GC of completed handlers |
| Per-session lock accumulation | Lazy creation + clear on step-down; locks are ~100 bytes each |
| Watch event storm during pipeline | Handler check is O(1) dict lookup — no-op reconciles are cheap |
| simpleeval security | Server-side only, expressions from admin-authored YAML, not user input |
| Pipeline YAML schema evolution | Version field on pipeline (pipeline_version: "1.0") |
| Decomposition breaks existing API clients | Backward-compatible DTO: keep fields on DTO, source from ContentSyncRecord |
Appendix A: Comparison — One-Step-Per-Reconcile vs. PipelineExecutor¶
| Aspect | ADR-031 (Current) | ADR-034 (Proposed) |
|---|---|---|
| Steps per reconcile | 1 | All (inner loop) |
| Time for 9 steps | ~4.5 min (30s × 9 polls) | ~30 sec |
| Progress persistence | ✅ After each step | ✅ After each step |
| Crash recovery | ✅ Resume from checkpoint | ✅ Resume from checkpoint |
| etcd projector needed | ✅ Required (missing = stall) | ❌ Not needed within pipeline (self-driving) |
| Polling required | ✅ Fallback drives progress | ❌ Watch + startup sweep only |
| Concurrency safety | ⚠️ Soft in_progress flag |
✅ Per-session asyncio.Lock |
| Reconciler complexity | Medium (dispatch per step) | Low (delegate to handler) |
| Pipeline definition | Hardcoded in reconciler | YAML in LabletDefinition |
| New pipeline effort | New code in reconciler | New YAML + step handlers |
Appendix B: Full Pipeline YAML Examples¶
See seed files:
data/seeds/lablet_definitions/exam-associate-auto-v1.1-lab-2.5.1.yaml— Full LDS pipeline (4 pipelines, 9+4+3+4 steps)data/seeds/lablet_definitions/exam-professional-enterprise-v1.0-lab-1.1.yaml— No-LDS pipeline (3 pipelines, skip_when pruning)