Skip to content

CML Worker Application CommandsΒΆ

Last Updated: November 17, 2025 Status: βœ… Current

OverviewΒΆ

This document summarizes the application commands responsible for CML Worker lifecycle management. Each command:

  • Operates through the CMLWorker aggregate (CQRS write side)
  • Publishes domain events which are now broadcast to the UI via SSE (e.g., worker.created, worker.status.updated, worker.terminated)
  • Integrates with the AWS EC2 API client through AwsEc2Client
  • Emits metrics & traces (OpenTelemetry) and handles AWS-specific exceptions

Related real-time behavior:

Domain Event SSE Event Trigger Source
CMLWorkerCreatedDomainEvent worker.created CreateCMLWorkerCommand
CMLWorkerStatusUpdatedDomainEvent worker.status.updated UpdateCMLWorkerStatusCommand / monitoring jobs
CMLWorkerTerminatedDomainEvent worker.terminated TerminateCMLWorkerCommand
CMLWorkerTelemetryUpdatedDomainEvent worker.metrics.updated Metrics collection job

Labs synchronization events (worker.labs.updated) originate from RefreshWorkerLabsCommand and the recurrent LabsRefreshJob.


Commands CreatedΒΆ

1. CreateCMLWorkerCommandΒΆ

File: src/application/commands/create_cml_worker_command.py

Purpose: Create a new CML Worker and provision AWS EC2 instance

Flow:

  1. Determines AMI from settings based on region
  2. Creates CMLWorker domain aggregate (PENDING status)
  3. Provisions EC2 instance via aws_ec2_client.create_instance()
  4. Assigns instance to worker aggregate with worker.assign_instance()
  5. Updates worker status based on instance state
  6. Saves to repository (publishes domain events)

Parameters:

  • name: Worker name
  • aws_region: AWS region
  • instance_type: EC2 instance type
  • ami_id: Optional AMI ID (uses settings if not provided)
  • ami_name: Optional AMI name
  • cml_version: Optional CML version
  • created_by: User who created the worker

Returns: CMLWorkerInstanceDto with instance details

Exception Handling:

  • EC2InvalidParameterException: Invalid AMI or parameters
  • EC2QuotaExceededException: AWS instance limit reached
  • EC2AuthenticationException: Invalid credentials
  • EC2InstanceCreationException: Instance creation failed

2. StartCMLWorkerCommandΒΆ

File: src/application/commands/start_cml_worker_command.py

Purpose: Start a stopped CML Worker EC2 instance

Flow:

  1. Retrieves worker from repository
  2. Validates worker has AWS instance assigned
  3. Checks current status (skip if already running, error if terminated)
  4. Starts EC2 instance via aws_ec2_client.start_instance()
  5. Updates worker status to STARTING
  6. Saves to repository (publishes domain events)

Parameters:

  • worker_id: CML Worker ID
  • started_by: Optional user who started the worker

Returns: bool (True if started successfully)

Exception Handling:

  • EC2InstanceNotFoundException: Instance not found
  • EC2InstanceOperationException: Start operation failed
  • EC2AuthenticationException: Invalid credentials

3. StopCMLWorkerCommandΒΆ

File: src/application/commands/stop_cml_worker_command.py

Purpose: Stop a running CML Worker EC2 instance

Flow:

  1. Retrieves worker from repository
  2. Validates worker has AWS instance assigned
  3. Checks current status (skip if already stopped, error if terminated)
  4. Stops EC2 instance via aws_ec2_client.stop_instance()
  5. Updates worker status to STOPPING
  6. Saves to repository (publishes domain events)

Parameters:

  • worker_id: CML Worker ID
  • stopped_by: Optional user who stopped the worker

Returns: bool (True if stopped successfully)

Exception Handling:

  • EC2InstanceNotFoundException: Instance not found
  • EC2InstanceOperationException: Stop operation failed
  • EC2AuthenticationException: Invalid credentials

4. TerminateCMLWorkerCommandΒΆ

File: src/application/commands/terminate_cml_worker_command.py

Purpose: Terminate CML Worker and permanently delete EC2 instance

Flow:

  1. Retrieves worker from repository
  2. If worker has AWS instance, terminates it via aws_ec2_client.terminate_instance()
  3. Handles instance-not-found gracefully (logs warning, continues)
  4. Marks worker as terminated via worker.terminate()
  5. Saves to repository (publishes domain events)

Parameters:

  • worker_id: CML Worker ID
  • terminated_by: Optional user who terminated the worker

Returns: bool (True if terminated successfully)

Exception Handling:

  • EC2InstanceNotFoundException: Logged as warning, continues (instance already gone)
  • EC2InstanceOperationException: Terminate operation failed
  • EC2AuthenticationException: Invalid credentials

Warning: This is a destructive operation that cannot be undone!


5. UpdateCMLWorkerTagsCommandΒΆ

File: src/application/commands/update_cml_worker_tags_command.py

Purpose: Add or update tags on CML Worker EC2 instance

Flow:

  1. Retrieves worker from repository
  2. Validates worker has AWS instance assigned
  3. Adds/updates tags via aws_ec2_client.add_tags()
  4. Retrieves all tags to return updated state
  5. Returns complete tag dictionary

Parameters:

  • worker_id: CML Worker ID
  • tags: Dictionary of tag key-value pairs
  • updated_by: Optional user who updated tags

Returns: dict[str, str] with all instance tags

Exception Handling:

  • EC2InstanceNotFoundException: Instance not found
  • EC2TagOperationException: Tag operation failed (e.g., 50-tag limit)
  • EC2AuthenticationException: Invalid credentials

Use Case: Organize resources, track costs, add metadata


6. UpdateCMLWorkerStatusCommandΒΆ

File: src/application/commands/update_cml_worker_status_command.py

Purpose: Sync CML Worker status from AWS EC2 (status reconciliation)

Flow:

  1. Retrieves worker from repository
  2. Queries EC2 instance status via aws_ec2_client.get_instance_status_checks()
  3. Maps EC2 state to CMLWorkerStatus:
  4. running β†’ RUNNING
  5. stopped β†’ STOPPED
  6. stopping β†’ STOPPING
  7. pending β†’ STARTING
  8. shutting-down β†’ STOPPING
  9. terminated β†’ calls worker.terminate()
  10. Updates worker status if changed
  11. Saves to repository if status changed (publishes domain events)

Parameters:

  • worker_id: CML Worker ID

Returns: dict[str, str] with status check information (instance_state, instance_status_check, ec2_system_status_check)

Exception Handling:

  • EC2InstanceNotFoundException: Marks worker as terminated (instance gone from AWS)
  • EC2StatusCheckException: Status check retrieval failed
  • EC2AuthenticationException: Invalid credentials

Use Case: Background job & manual reconciliation to keep worker state in sync with AWS


Integration PointsΒΆ

Domain LayerΒΆ

All commands interact with:

  • CMLWorker aggregate: Domain entity with business logic
  • CMLWorkerRepository: Abstract repository for persistence
  • Domain events: Automatically published when worker state changes

Integration LayerΒΆ

All commands use:

  • AwsEc2Client: AWS EC2 API wrapper
  • Specific exceptions: EC2InvalidParameterException, EC2InstanceNotFoundException, etc.
  • CMLWorkerInstanceDto: Data transfer object for AWS responses

Application SettingsΒΆ

Commands use these settings:

  • cml_worker_ami_ids: Dict of AMI IDs per region
  • cml_worker_ami_names: Dict of AMI names per region
  • cml_worker_security_group_ids: Security group IDs
  • cml_worker_subnet_id: VPC subnet ID
  • cml_worker_key_name: SSH key pair name

CQRS Pattern & Real-Time FlowΒΆ

sequenceDiagram
   participant UI
   participant API
   participant CommandHandler
   participant DomainEvents
   participant SSE

   UI->>API: POST /api/region/{r}/workers/{id}/stop
   API->>CommandHandler: StopCMLWorkerCommand
   CommandHandler->>DomainEvents: emit CMLWorkerStatusUpdatedDomainEvent
   DomainEvents->>SSE: broadcast worker.status.updated
   SSE-->>UI: EventSource message (update row, modal)

This pattern repeats across all lifecycle commands, enabling near-instant UI reflection of backend changes without manual refresh.

All commands follow the Neuroglia CQRS pattern:

@dataclass
class MyCommand(Command[OperationResult[TReturn]]):
    """Command definition"""
    param1: str
    param2: int

class MyCommandHandler(
    CommandHandlerBase,
    CommandHandler[MyCommand, OperationResult[TReturn]],
):
    """Command handler"""

    async def handle_async(self, request: MyCommand) -> OperationResult[TReturn]:
        # Implementation
        pass

ObservabilityΒΆ

All commands include:

  1. OpenTelemetry Tracing:
  2. Automatic span from CQRS middleware
  3. Custom spans for each operation phase
  4. Attributes for debugging (worker_id, instance_id, status, etc.)

  5. Structured Logging:

  6. Info logs for success
  7. Error logs with exception details
  8. Warning logs for non-critical issues

  9. Error Context:

  10. Specific exception types for different error scenarios
  11. Error messages include relevant IDs and context
  12. Stack traces for unexpected errors

Usage ExamplesΒΆ

Create WorkerΒΆ

command = CreateCMLWorkerCommand(
    name="cml-worker-prod-01",
    aws_region="us-east-1",
    instance_type="c5.2xlarge",
    cml_version="2.6.1",
    created_by="user-123",
)
result = await mediator.send_async(command)

Start WorkerΒΆ

command = StartCMLWorkerCommand(
    worker_id="550e8400-e29b-41d4-a716-446655440000",
    started_by="user-123",
)
result = await mediator.send_async(command)

Sync Status (Background Job)ΒΆ

# Run periodically to keep workers in sync with AWS
for worker in await repository.get_all_async():
    command = UpdateCMLWorkerStatusCommand(worker_id=worker.id())
    await mediator.send_async(command)

File StructureΒΆ

src/application/commands/
β”œβ”€β”€ __init__.py                          # Exports all commands
β”œβ”€β”€ command_handler_base.py              # Base class
β”œβ”€β”€ create_cml_worker_command.py         # βœ… New
β”œβ”€β”€ start_cml_worker_command.py          # βœ… New
β”œβ”€β”€ stop_cml_worker_command.py           # βœ… New
β”œβ”€β”€ terminate_cml_worker_command.py      # βœ… New
β”œβ”€β”€ update_cml_worker_tags_command.py    # βœ… New
β”œβ”€β”€ update_cml_worker_status_command.py  # βœ… New
β”œβ”€β”€ create_task_command.py               # Existing
β”œβ”€β”€ update_task_command.py               # Existing
└── delete_task_command.py               # Existing

Next StepsΒΆ

  1. Create Queries:
  2. GetCMLWorkerQuery - Retrieve worker by ID
  3. ListCMLWorkersQuery - List all workers with filtering
  4. GetCMLWorkerMetricsQuery - Get worker telemetry

  5. API Controllers:

  6. Add endpoints to src/api/controllers/workers_controller.py
  7. Map commands to HTTP POST/PUT endpoints
  8. Handle authentication and authorization

  9. Background Jobs:

  10. Status sync job (every 5 minutes)
  11. Idle worker detection (check telemetry)
  12. Auto-stop/terminate for cost optimization

  13. Tests:

  14. Unit tests for each command handler
  15. Mock AWS client responses
  16. Test exception handling paths

SummaryΒΆ

βœ… 6 commands created for complete CML Worker lifecycle management βœ… Domain-driven design with proper aggregate boundaries βœ… Clean architecture with separation of concerns βœ… Specific exception handling for better error management βœ… Full observability with tracing and logging βœ… CQRS pattern following Neuroglia framework

The application layer is now complete for CML Worker management! 🎯