π Unit of Work PatternΒΆ
β οΈ DEPRECATED: This pattern has been superseded by repository-based event publishing where the command handler serves as the transaction boundary. See Persistence Patterns for the current recommended approach.
Migration Note: The framework no longer requires explicit UnitOfWork usage. Domain events are now automatically published by repositories when persisting aggregates, eliminating the need for manual event registration and coordination.
π Historical ContextΒΆ
This documentation is preserved for reference and migration purposes.
Estimated reading time: 25 minutes
The Unit of Work pattern maintained a list of objects affected by a business transaction and coordinated writing out changes while resolving concurrency problems. In earlier versions of Neuroglia, it provided automatic domain event collection and dispatching.
π Current Approach (Recommended)ΒΆ
Instead of Unit of Work, use the repository-based pattern:
# β
CURRENT PATTERN: Repository handles event publishing
class CreateOrderHandler(CommandHandler[CreateOrderCommand, OperationResult[OrderDto]]):
def __init__(self, order_repository: OrderRepository):
self.order_repository = order_repository
async def handle_async(self, command: CreateOrderCommand):
# 1. Create order (raises domain events internally)
order = Order.create(command.customer_id, command.items)
# 2. Save order - repository automatically publishes events
await self.order_repository.save_async(order)
# Repository does:
# β
Saves order state
# β
Gets uncommitted events
# β
Publishes events to event bus
# β
Clears events from aggregate
return self.created(order)
See Persistence Patterns for detailed documentation.
π‘ What & Why (Historical)ΒΆ
β The Problem: Manual Event Management and Inconsistent TransactionsΒΆ
Without Unit of Work, managing domain events and transactional consistency is manual and error-prone:
# β PROBLEM: Manual event management and inconsistent transactions
class CreateOrderHandler(CommandHandler[CreateOrderCommand, OperationResult[OrderDto]]):
def __init__(self,
order_repository: OrderRepository,
event_bus: EventBus):
self.order_repository = order_repository
self.event_bus = event_bus
async def handle_async(self, command: CreateOrderCommand):
# Create order (raises domain events)
order = Order.create(command.customer_id, command.items)
# Save order
await self.order_repository.save_async(order)
# PROBLEM: Must manually extract and publish events!
events = order.get_uncommitted_events()
for event in events:
await self.event_bus.publish_async(event)
order.mark_events_as_committed()
# PROBLEMS:
# β What if save succeeds but event publishing fails?
# β Events published even if transaction rolls back!
# β Must remember to publish events in EVERY handler!
# β Easy to forget to mark events as committed!
# β No coordination between multiple aggregates!
return self.created(order)
# Another handler with the SAME manual code!
class ConfirmOrderHandler(CommandHandler[ConfirmOrderCommand, OperationResult[OrderDto]]):
async def handle_async(self, command: ConfirmOrderCommand):
order = await self.order_repository.get_by_id_async(command.order_id)
order.confirm()
await self.order_repository.save_async(order)
# Copy-pasted event management code - DUPLICATION!
events = order.get_uncommitted_events()
for event in events:
await self.event_bus.publish_async(event)
order.mark_events_as_committed()
return self.ok(order)
# Multi-aggregate scenario is even WORSE!
class TransferInventoryHandler(CommandHandler[TransferInventoryCommand, OperationResult]):
async def handle_async(self, command: TransferInventoryCommand):
source = await self.warehouse_repository.get_by_id_async(command.source_id)
target = await self.warehouse_repository.get_by_id_async(command.target_id)
# Modify both aggregates
source.remove_inventory(command.product_id, command.quantity)
target.add_inventory(command.product_id, command.quantity)
# Save both
await self.warehouse_repository.save_async(source)
await self.warehouse_repository.save_async(target)
# PROBLEM: Must manually collect events from BOTH aggregates!
all_events = []
all_events.extend(source.get_uncommitted_events())
all_events.extend(target.get_uncommitted_events())
for event in all_events:
await self.event_bus.publish_async(event)
source.mark_events_as_committed()
target.mark_events_as_committed()
# This is TEDIOUS and ERROR-PRONE!
return self.ok()
Problems with Manual Event Management:
- β Event Publishing Scattered: Every handler must remember to publish events
- β Duplication: Same event management code copy-pasted everywhere
- β Inconsistency Risk: Events published even if transaction fails
- β Multi-Aggregate Complexity: Collecting events from multiple aggregates is tedious
- β Easy to Forget: Developers forget to publish events or mark as committed
- β No Coordination: No central mechanism to track modified aggregates
β The Solution: Unit of Work for Automatic Event CoordinationΒΆ
Unit of Work automatically tracks aggregates and coordinates event dispatching:
# β
SOLUTION: Unit of Work handles event coordination automatically
from neuroglia.data.abstractions import IUnitOfWork
class CreateOrderHandler(CommandHandler[CreateOrderCommand, OperationResult[OrderDto]]):
def __init__(self,
order_repository: OrderRepository,
unit_of_work: IUnitOfWork):
self.order_repository = order_repository
self.unit_of_work = unit_of_work
async def handle_async(self, command: CreateOrderCommand):
# 1. Create order (raises domain events)
order = Order.create(command.customer_id, command.items)
# 2. Save order
await self.order_repository.save_async(order)
# 3. Register aggregate with Unit of Work
self.unit_of_work.register_aggregate(order)
# That's IT! Pipeline behavior handles the rest:
# - Extracts events from registered aggregates
# - Publishes events ONLY if transaction succeeds
# - Marks events as committed automatically
# - Clears unit of work for next request
return self.created(order)
# Multi-aggregate scenario is SIMPLE!
class TransferInventoryHandler(CommandHandler[TransferInventoryCommand, OperationResult]):
async def handle_async(self, command: TransferInventoryCommand):
source = await self.warehouse_repository.get_by_id_async(command.source_id)
target = await self.warehouse_repository.get_by_id_async(command.target_id)
# Modify both aggregates
source.remove_inventory(command.product_id, command.quantity)
target.add_inventory(command.product_id, command.quantity)
# Save both
await self.warehouse_repository.save_async(source)
await self.warehouse_repository.save_async(target)
# Register both aggregates
self.unit_of_work.register_aggregate(source)
self.unit_of_work.register_aggregate(target)
# Unit of Work collects events from BOTH automatically!
return self.ok()
# Pipeline Behavior handles event dispatching automatically
class DomainEventDispatcherBehavior(PipelineBehavior):
def __init__(self,
unit_of_work: IUnitOfWork,
event_bus: EventBus):
self.unit_of_work = unit_of_work
self.event_bus = event_bus
async def handle_async(self, request, next_handler):
# Execute handler
result = await next_handler()
# Only publish events if handler succeeded
if result.is_success:
# Get ALL events from ALL registered aggregates
events = self.unit_of_work.get_domain_events()
# Publish events
for event in events:
await self.event_bus.publish_async(event)
# Clear unit of work for next request
self.unit_of_work.clear()
return result
# Register in DI container
services = ServiceCollection()
services.add_scoped(IUnitOfWork, UnitOfWork)
services.add_scoped(PipelineBehavior, DomainEventDispatcherBehavior)
Benefits of Unit of Work:
- β Automatic Event Collection: No manual event extraction needed
- β Centralized Coordination: Single place to track modified aggregates
- β Consistent Event Publishing: Events only published if transaction succeeds
- β Multi-Aggregate Support: Easily handle multiple aggregates in one transaction
- β Reduced Duplication: Event management code in one place (pipeline)
- β Hard to Forget: Framework handles event lifecycle automatically
- β Testability: Unit of Work can be mocked for testing
π― Pattern OverviewΒΆ
The Unit of Work pattern serves as a coordination mechanism between your domain aggregates and the infrastructure, ensuring that:
- Transactional Consistency: All changes within a business operation succeed or fail together
- Event Coordination: Domain events are collected and dispatched automatically after successful operations
- Aggregate Tracking: Modified entities are tracked during the request lifecycle
- Clean Separation: Domain logic remains pure while infrastructure handles persistence concerns
ποΈ Architecture IntegrationΒΆ
graph TB
subgraph "πΌ Application Layer"
CH[Command Handler]
QH[Query Handler]
end
subgraph "π Unit of Work"
UOW[IUnitOfWork]
AGG_LIST[Registered Aggregates]
EVENT_COLL[Domain Event Collection]
end
subgraph "ποΈ Domain Layer"
ENT1[Entity/Aggregate 1]
ENT2[Entity/Aggregate 2]
DE[Domain Events]
end
subgraph "π§ Infrastructure"
REPO[Repositories]
DB[(Database)]
MW[Pipeline Middleware]
end
CH --> UOW
CH --> ENT1
CH --> ENT2
ENT1 --> DE
ENT2 --> DE
UOW --> AGG_LIST
UOW --> EVENT_COLL
CH --> REPO
REPO --> DB
MW --> UOW
MW --> DE
π§ Core InterfaceΒΆ
The IUnitOfWork interface provides a simple contract for aggregate and event management:
from abc import ABC, abstractmethod
from typing import List
from neuroglia.data.abstractions import AggregateRoot, DomainEvent
class IUnitOfWork(ABC):
"""Unit of Work pattern for coordinating aggregate changes and domain events."""
@abstractmethod
def register_aggregate(self, aggregate: AggregateRoot) -> None:
"""Registers an aggregate for event collection and tracking."""
@abstractmethod
def get_domain_events(self) -> List[DomainEvent]:
"""Gets all domain events from registered aggregates."""
@abstractmethod
def clear(self) -> None:
"""Clears all registered aggregates and collected events."""
@abstractmethod
def has_changes(self) -> bool:
"""Determines if any aggregates have pending changes."""
π¦ Implementation PatternsΒΆ
1. Basic Usage in Command HandlersΒΆ
class CreateOrderHandler(CommandHandler[CreateOrderCommand, OperationResult[OrderDto]]):
def __init__(self,
order_repository: OrderRepository,
unit_of_work: IUnitOfWork):
self.order_repository = order_repository
self.unit_of_work = unit_of_work
async def handle_async(self, command: CreateOrderCommand) -> OperationResult[OrderDto]:
# 1. Create domain entity (raises domain events)
order = Order.create(command.customer_id, command.items)
# 2. Persist state
await self.order_repository.save_async(order)
# 3. Register for automatic event dispatching
self.unit_of_work.register_aggregate(order)
# 4. Return result - events dispatched automatically by middleware
return self.created(OrderDto.from_entity(order))
2. Multiple Aggregates in Single TransactionΒΆ
class ProcessPaymentHandler(CommandHandler):
async def handle_async(self, command: ProcessPaymentCommand) -> OperationResult:
# Multiple aggregates modified in single business transaction
# Update order
order = await self.order_repository.get_by_id_async(command.order_id)
order.mark_paid(command.payment_id) # Raises OrderPaidEvent
await self.order_repository.save_async(order)
self.unit_of_work.register_aggregate(order)
# Update customer account
customer = await self.customer_repository.get_by_id_async(order.customer_id)
customer.record_purchase(order.total_amount) # Raises PurchaseRecordedEvent
await self.customer_repository.save_async(customer)
self.unit_of_work.register_aggregate(customer)
# Update inventory
for item in order.items:
inventory = await self.inventory_repository.get_by_product_id(item.product_id)
inventory.reduce_stock(item.quantity) # Raises StockReducedEvent
await self.inventory_repository.save_async(inventory)
self.unit_of_work.register_aggregate(inventory)
# All events from all aggregates dispatched together
return self.ok({"order_id": order.id, "payment_processed": True})
π Persistence Pattern FlexibilityΒΆ
The Unit of Work pattern supports multiple persistence approaches with the same infrastructure:
Pattern 1: Simple Entity with State PersistenceΒΆ
β Complexity Level: βββββ
Best for: CRUD operations, simple domains, traditional persistence
class Product(Entity): # β Just Entity, no AggregateRoot needed!
"""Simple entity with domain events for state-based persistence."""
def __init__(self, name: str, price: float):
super().__init__()
self._id = str(uuid.uuid4())
self.name = name
self.price = price
# Raise domain event
self._raise_domain_event(ProductCreatedEvent(self.id, name, price))
def update_price(self, new_price: float):
"""Business logic with domain event."""
if new_price != self.price:
old_price = self.price
self.price = new_price
self._raise_domain_event(PriceChangedEvent(self.id, old_price, new_price))
# Minimal event infrastructure
def _raise_domain_event(self, event: DomainEvent):
if not hasattr(self, '_pending_events'):
self._pending_events = []
self._pending_events.append(event)
@property
def domain_events(self) -> List[DomainEvent]:
return getattr(self, '_pending_events', []).copy()
def clear_pending_events(self):
if hasattr(self, '_pending_events'):
self._pending_events.clear()
# Usage - Same UnitOfWork, simpler entity
class UpdateProductPriceHandler(CommandHandler):
async def handle_async(self, command: UpdateProductPriceCommand):
product = await self.product_repository.get_by_id_async(command.product_id)
product.update_price(command.new_price) # Raises PriceChangedEvent
await self.product_repository.save_async(product) # Save state to DB
self.unit_of_work.register_aggregate(product) # Auto-dispatch events
return self.ok(ProductDto.from_entity(product))
Characteristics:
- β Direct state persistence to database
- β
Simple entity inheritance (
Entity) - β Automatic domain event dispatching
- β No event store required
- β Traditional database schemas
- β Easy to understand and implement
Pattern 2: Aggregate Root with Event SourcingΒΆ
β Complexity Level: βββββ
Best for: Complex domains, audit requirements, temporal queries
class OrderAggregate(AggregateRoot[OrderState, UUID]):
"""Complex aggregate with full event sourcing."""
def place_order(self, customer_id: str, items: List[OrderItem]):
"""Rich business logic with event sourcing."""
# Business rules validation
if not items:
raise DomainException("Order must have at least one item")
total = sum(item.price * item.quantity for item in items)
if total <= 0:
raise DomainException("Order total must be positive")
# Apply event to change state
event = OrderPlacedEvent(
order_id=self.id,
customer_id=customer_id,
items=items,
total_amount=total,
placed_at=datetime.utcnow()
)
# Event sourcing: event changes state + is stored for replay
self.state.on(event) # Apply to current state
self.register_event(event) # Register for persistence
def add_item(self, product_id: str, quantity: int, price: Decimal):
"""Add item with business rule validation."""
if self.state.status != OrderStatus.DRAFT:
raise DomainException("Cannot modify confirmed order")
event = ItemAddedEvent(self.id, product_id, quantity, price)
self.state.on(event)
self.register_event(event)
# Usage - Same UnitOfWork, event-sourced aggregate
class PlaceOrderHandler(CommandHandler):
async def handle_async(self, command: PlaceOrderCommand):
order = OrderAggregate()
order.place_order(command.customer_id, command.items) # Raises OrderPlacedEvent
await self.order_repository.save_async(order) # Save events to event store
self.unit_of_work.register_aggregate(order) # Auto-dispatch events
return self.created(OrderDto.from_aggregate(order))
Characteristics:
- β Complete event sourcing with event store
- β
Full aggregate pattern with
AggregateRoot[TState, TKey] - β Rich business logic and invariant enforcement
- β Temporal queries and audit trails
- β Event replay and projection capabilities
- β Complex consistency boundaries
Pattern 3: Hybrid ApproachΒΆ
β Complexity Level: βββββ
Best for: Mixed requirements, gradual migration, pragmatic solutions
# Mix both patterns in the same application
class OrderProcessingHandler(CommandHandler):
async def handle_async(self, command: ProcessOrderCommand):
# Event-sourced aggregate for complex business logic
order = await self.order_repository.get_by_id_async(command.order_id)
order.process_payment(command.payment_info) # Complex event-sourced logic
await self.order_repository.save_async(order)
self.unit_of_work.register_aggregate(order)
# Simple entity for straightforward updates
inventory = await self.inventory_repository.get_by_product_id(command.product_id)
inventory.reduce_stock(command.quantity) # Simple state-based persistence
await self.inventory_repository.save_async(inventory)
self.unit_of_work.register_aggregate(inventory)
# Both patterns work together seamlessly
return self.ok()
π§ Infrastructure IntegrationΒΆ
Pipeline IntegrationΒΆ
The Unit of Work integrates seamlessly with the mediation pipeline through DomainEventDispatchingMiddleware:
# Automatic setup with configuration methods
builder = WebApplicationBuilder()
Mediator.configure(builder, ["application.commands", "application.queries"])
UnitOfWork.configure(builder)
DomainEventDispatchingMiddleware.configure(builder)
# Pipeline execution flow:
# 1. Command received
# 2. [Optional] Transaction begins
# 3. Command handler executes
# 4. Handler registers aggregates with UnitOfWork
# 5. Command completes successfully
# 6. DomainEventDispatchingMiddleware collects events
# 7. Events dispatched through mediator
# 8. [Optional] Transaction commits
Event Collection MechanismΒΆ
The UnitOfWork uses duck typing to collect events from any object, supporting both patterns:
def get_domain_events(self) -> List[DomainEvent]:
"""Flexible event collection supporting multiple patterns."""
events = []
for aggregate in self._aggregates:
# Event-sourced aggregates (AggregateRoot)
if hasattr(aggregate, 'get_uncommitted_events'):
events.extend(aggregate.get_uncommitted_events())
# State-based entities (Entity + domain_events)
elif hasattr(aggregate, 'domain_events'):
events.extend(aggregate.domain_events)
# Fallback to internal events
elif hasattr(aggregate, '_pending_events'):
events.extend(aggregate._pending_events)
return events
π Complexity ComparisonΒΆ
| Aspect | Entity + State Persistence | AggregateRoot + Event Sourcing |
|---|---|---|
| Learning Curve | βββββ Simple | βββββ Complex |
| Setup Complexity | βββββ Minimal | βββββ Significant |
| Database Requirements | Any SQL/NoSQL database | Event store + projections |
| Query Capabilities | Direct database queries | Event replay + projections |
| Business Logic | Method-based | Event-driven state machines |
| Audit & History | Manual implementation | Built-in temporal queries |
| Performance | Direct database access | Event replay overhead |
| Scalability | Traditional scaling | Event-driven scaling |
π― When to Use Each PatternΒΆ
Choose Entity + State Persistence WhenΒΆ
- β Building CRUD-heavy applications
- β Team is new to DDD/event sourcing
- β Simple business rules and workflows
- β Traditional database infrastructure
- β Performance is critical
- β Quick prototyping and development
Choose AggregateRoot + Event Sourcing WhenΒΆ
- β Complex business domains with rich logic
- β Audit trails and compliance requirements
- β Temporal queries and historical analysis
- β Event-driven integrations
- β High consistency requirements
- β Long-term maintainability over complexity
Choose Hybrid Approach WhenΒΆ
- β Mixed complexity across domain areas
- β Migrating from traditional to event-sourced systems
- β Different persistence requirements per bounded context
- β Pragmatic balance between complexity and capability
π Integration with Other PatternsΒΆ
CQRS IntegrationΒΆ
The Unit of Work pattern works seamlessly with CQRS:
# Commands use UnitOfWork for writes
class CreateProductHandler(CommandHandler):
async def handle_async(self, command):
product = Product(command.name, command.price)
await self.repository.save_async(product)
self.unit_of_work.register_aggregate(product) # Events dispatched
return self.created(product)
# Queries bypass UnitOfWork for reads
class GetProductHandler(QueryHandler):
async def handle_async(self, query):
return await self.repository.get_by_id_async(query.product_id)
# No UnitOfWork needed for read operations
Pipeline Behaviors IntegrationΒΆ
Unit of Work integrates with Pipeline Behaviors:
# Transaction behavior + Domain event dispatching
services.add_scoped(PipelineBehavior, TransactionBehavior) # 1st: Manages DB transactions
services.add_scoped(PipelineBehavior, DomainEventDispatchingMiddleware) # 2nd: Dispatches events after success
services.add_scoped(PipelineBehavior, LoggingBehavior) # 3rd: Logs execution
# Execution order ensures events only dispatch after successful transaction commit
Repository Pattern IntegrationΒΆ
Unit of Work coordinates with Repository Pattern:
# Repository handles persistence, UnitOfWork handles events
class OrderHandler(CommandHandler):
async def handle_async(self, command):
order = Order.create(command.data) # Domain logic
await self.repository.save_async(order) # Repository persistence
self.unit_of_work.register_aggregate(order) # UnitOfWork event coordination
return self.created(order)
π§ͺ Testing StrategiesΒΆ
Unit Testing Domain EventsΒΆ
def test_product_creation_raises_event():
"""Test domain events are raised correctly."""
product = Product("Laptop", 999.99)
events = product.domain_events
assert len(events) == 1
assert isinstance(events[0], ProductCreatedEvent)
assert events[0].name == "Laptop"
assert events[0].price == 999.99
def test_price_update_raises_event():
"""Test business operations raise appropriate events."""
product = Product("Laptop", 999.99)
product.clear_pending_events() # Clear creation event
product.update_price(899.99)
events = product.domain_events
assert len(events) == 1
assert isinstance(events[0], PriceChangedEvent)
assert events[0].old_price == 999.99
assert events[0].new_price == 899.99
Integration Testing with UnitOfWorkΒΆ
@pytest.mark.asyncio
async def test_command_handler_registers_aggregates():
"""Test complete command handler workflow with UnitOfWork."""
# Setup
unit_of_work = UnitOfWork()
repository = InMemoryProductRepository()
handler = CreateProductHandler(repository, unit_of_work)
# Execute
command = CreateProductCommand("Laptop", 999.99)
result = await handler.handle_async(command)
# Verify
assert result.is_success
assert unit_of_work.has_changes()
events = unit_of_work.get_domain_events()
assert len(events) == 1
assert isinstance(events[0], ProductCreatedEvent)
@pytest.mark.asyncio
async def test_middleware_dispatches_events():
"""Test automatic event dispatching through middleware."""
# Setup
mock_mediator = Mock()
middleware = DomainEventDispatchingMiddleware(unit_of_work, mock_mediator)
# Setup aggregate with events
product = Product("Test Product", 10.0)
unit_of_work.register_aggregate(product)
# Execute
async def successful_handler():
return OperationResult("OK", 200)
command = CreateProductCommand("Test Product", 10.0)
result = await middleware.handle_async(command, successful_handler)
# Verify
assert result.is_success
mock_mediator.publish_async.assert_called_once()
assert not unit_of_work.has_changes() # Cleared after dispatching
π¨ Best PracticesΒΆ
Entity Design PatternsΒΆ
# β
Good: Business-focused methods with events
class Order(Entity):
def place_order(self, items: List[OrderItem]):
self._validate_order_items(items)
self.status = OrderStatus.PLACED
self._raise_event(OrderPlacedEvent(self.id, items))
def add_item(self, item: OrderItem):
if self.status != OrderStatus.DRAFT:
raise DomainException("Cannot modify placed order")
self.items.append(item)
self._raise_event(ItemAddedEvent(self.id, item))
# β Avoid: Property setters that bypass business rules
class Order(Entity):
@property
def status(self, value):
self._status = value # No validation, no events!
Event Design PatternsΒΆ
# β
Good: Rich, immutable events with business context
@dataclass(frozen=True)
class OrderPlacedEvent(DomainEvent):
order_id: str
customer_id: str
items: List[OrderItem]
total_amount: Decimal
placed_at: datetime
# β Avoid: Anemic events without context
@dataclass
class OrderEvent(DomainEvent):
order_id: str # Too generic, lacks business meaning
UnitOfWork Usage PatternsΒΆ
# β
Good: Register aggregates after business operations
async def handle_async(self, command):
order = Order.create(command.items) # Business logic first
await self.repository.save_async(order) # Persistence second
self.unit_of_work.register_aggregate(order) # Event coordination last
return self.created(order)
# β Avoid: Registering before business operations complete
async def handle_async(self, command):
order = Order()
self.unit_of_work.register_aggregate(order) # Too early!
order.add_items(command.items) # Business logic after registration
return self.created(order)
β οΈ Common MistakesΒΆ
1. Forgetting to Register AggregatesΒΆ
# β WRONG: Forget to register aggregate (events never dispatched!)
async def handle_async(self, command: CreateOrderCommand):
order = Order.create(command.customer_id, command.items)
await self.repository.save_async(order)
# Forgot to register! Events will NOT be published!
return self.created(order)
# β
CORRECT: Always register aggregates that raise events
async def handle_async(self, command: CreateOrderCommand):
order = Order.create(command.customer_id, command.items)
await self.repository.save_async(order)
self.unit_of_work.register_aggregate(order) # Events will be published!
return self.created(order)
2. Registering Aggregates Before Operations CompleteΒΆ
# β WRONG: Register too early (captures partial state)
async def handle_async(self, command: CreateOrderCommand):
order = Order(command.customer_id)
self.unit_of_work.register_aggregate(order) # TOO EARLY!
# These events won't be captured by unit of work!
order.add_items(command.items)
order.apply_discount(command.discount_code)
await self.repository.save_async(order)
return self.created(order)
# β
CORRECT: Register after all business operations
async def handle_async(self, command: CreateOrderCommand):
order = Order(command.customer_id)
order.add_items(command.items)
order.apply_discount(command.discount_code)
await self.repository.save_async(order)
self.unit_of_work.register_aggregate(order) # Captures ALL events!
return self.created(order)
3. Not Clearing Unit of Work Between RequestsΒΆ
# β WRONG: Reusing same Unit of Work without clearing
class MyPipelineBehavior(PipelineBehavior):
async def handle_async(self, request, next_handler):
result = await next_handler()
if result.is_success:
events = self.unit_of_work.get_domain_events()
for event in events:
await self.event_bus.publish_async(event)
# FORGOT to clear! Events accumulate across requests!
return result
# β
CORRECT: Always clear after processing
class MyPipelineBehavior(PipelineBehavior):
async def handle_async(self, request, next_handler):
try:
result = await next_handler()
if result.is_success:
events = self.unit_of_work.get_domain_events()
for event in events:
await self.event_bus.publish_async(event)
return result
finally:
self.unit_of_work.clear() # Always clear!
4. Using Singleton Lifetime for Unit of WorkΒΆ
# β WRONG: Singleton lifetime (shared across all requests!)
services.add_singleton(IUnitOfWork, UnitOfWork)
# All requests share the same Unit of Work - DISASTER!
# β
CORRECT: Scoped lifetime (one per request)
services.add_scoped(IUnitOfWork, UnitOfWork)
# Each request gets its own Unit of Work instance
5. Publishing Events Before Transaction CommitsΒΆ
# β WRONG: Publishing events before save completes
async def handle_async(self, command: CreateOrderCommand):
order = Order.create(command.customer_id, command.items)
# Publishing BEFORE save!
events = order.get_uncommitted_events()
for event in events:
await self.event_bus.publish_async(event)
# What if save fails? Events already published!
await self.repository.save_async(order)
return self.created(order)
# β
CORRECT: Let pipeline behavior publish AFTER save
async def handle_async(self, command: CreateOrderCommand):
order = Order.create(command.customer_id, command.items)
await self.repository.save_async(order)
self.unit_of_work.register_aggregate(order)
# Pipeline publishes events ONLY if handler succeeds
return self.created(order)
6. Not Handling Event Publishing FailuresΒΆ
# β WRONG: No error handling for event publishing
class EventDispatcherBehavior(PipelineBehavior):
async def handle_async(self, request, next_handler):
result = await next_handler()
events = self.unit_of_work.get_domain_events()
for event in events:
await self.event_bus.publish_async(event) # What if this fails?
return result
# β
CORRECT: Handle publishing failures gracefully
class EventDispatcherBehavior(PipelineBehavior):
async def handle_async(self, request, next_handler):
result = await next_handler()
if not result.is_success:
return result
events = self.unit_of_work.get_domain_events()
try:
for event in events:
await self.event_bus.publish_async(event)
except Exception as ex:
self.logger.error(f"Failed to publish events: {ex}")
# Consider: retry, dead letter queue, or compensating transaction
# For now, log and continue (events are stored with aggregate)
finally:
self.unit_of_work.clear()
return result
π« When NOT to UseΒΆ
1. Simple CRUD Operations Without Domain EventsΒΆ
# Unit of Work is overkill for simple data updates
class UpdateCustomerEmailHandler:
async def handle_async(self, command: UpdateEmailCommand):
# Simple update, no domain events needed
await self.db.customers.update_one(
{"id": command.customer_id},
{"$set": {"email": command.new_email}}
)
# No need for Unit of Work here
2. Read-Only QueriesΒΆ
# Unit of Work is for WRITE operations, not queries
class GetOrderByIdHandler:
async def handle_async(self, query: GetOrderByIdQuery):
# Just reading data, no events to coordinate
return await self.repository.get_by_id_async(query.order_id)
# No Unit of Work needed
3. Stateless Services Without AggregatesΒΆ
# Services that don't work with domain aggregates
class PriceCalculationService:
def calculate_total(self, items: List[OrderItem]) -> Decimal:
# Pure calculation, no state changes, no events
return sum(item.price * item.quantity for item in items)
# No Unit of Work needed
4. External API IntegrationΒΆ
# Unit of Work is for domain aggregates, not external APIs
class SendEmailHandler:
async def handle_async(self, command: SendEmailCommand):
# Calling external API, not modifying aggregates
await self.email_api.send_async(
to=command.recipient,
subject=command.subject,
body=command.body
)
# No aggregates, no Unit of Work needed
5. Background Jobs Without Domain LogicΒΆ
# Simple background tasks without domain events
class CleanupOldLogsJob:
async def execute_async(self):
# Deleting old data, not raising domain events
cutoff = datetime.now() - timedelta(days=90)
await self.db.logs.delete_many({"created_at": {"$lt": cutoff}})
# No domain events, no Unit of Work needed
π Key TakeawaysΒΆ
- Unit of Work coordinates aggregate changes and event dispatching
- Automatically collects domain events from registered aggregates
- Ensures transactional consistency by publishing events only after save succeeds
- Reduces boilerplate by eliminating manual event management in every handler
- Supports multi-aggregate transactions with centralized coordination
- Always register aggregates after business operations complete
- Use scoped lifetime (one Unit of Work per request)
- Clear Unit of Work after each request to prevent event accumulation
- Pipeline behaviors typically handle event dispatching automatically
- Framework provides IUnitOfWork interface and implementation
π Related DocumentationΒΆ
- π― Simple CQRS - Command and Query handling patterns
- π§ Pipeline Behaviors - Cross-cutting concern patterns
- ποΈ State-Based Persistence - Detailed state persistence guide
- ποΈ Domain Driven Design - Comprehensive DDD patterns
- π¦ Repository Pattern - Data access abstraction patterns
- π‘ Event-Driven Architecture - Event handling and integration patterns
The Unit of Work pattern provides the coordination layer that makes domain-driven design practical and maintainable, supporting both simple and complex persistence scenarios within the same architectural framework.