CQRS (Command Query Responsibility Segregation)ΒΆ
Time to read: 13 minutes
CQRS separates write operations (Commands) from read operations (Queries). Instead of one model doing everything, you have specialized models for reading and writing.
β The Problem: One Model for EverythingΒΆ
Traditional approach uses same model for reads and writes:
# β Single model handles everything
class OrderService:
def __init__(self, repository: OrderRepository):
self.repository = repository
# Write operation
def create_order(self, customer_id: str, items: List[dict]) -> Order:
order = Order(customer_id)
for item in items:
order.add_item(item['pizza'], item['quantity'])
self.repository.save(order)
return order
# Read operation
def get_order(self, order_id: str) -> Order:
return self.repository.get_by_id(order_id)
# Read operation
def get_customer_orders(self, customer_id: str) -> List[Order]:
return self.repository.find_by_customer(customer_id)
# Read operation (complex query)
def get_order_statistics(self, date_from, date_to):
orders = self.repository.find_by_date_range(date_from, date_to)
# Complex aggregation logic here...
return statistics
Problems:
- Conflicting concerns: Write needs validation, reads need speed
- Complex queries: Domain model not optimized for reporting
- Scalability: Can't scale reads and writes independently
- Performance: Writes and reads contend for same resources
- Security: Same permissions for reads and writes
β The Solution: Separate Read and Write ModelsΒΆ
Split operations into Commands (write) and Queries (read):
ββββββββββββββββββββββββββββββββββββββββββββββββββ
β Application β
β β
β Commands (Write) Queries (Read) β
β ββββββββββββββββββ βββββββββββββββββββ β
β β PlaceOrder β β GetOrderById β β
β β ConfirmOrder β β GetCustomerOrds β β
β β CancelOrder β β GetStatistics β β
β ββββββββββ¬ββββββββ ββββββββββ¬βββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββ βββββββββββββββββββ β
β β Write Model β β Read Model β β
β β (Domain Agg) β β (Optimized DTO) β β
β β - Rich domain β β - Flat, denorm β β
β β - Validations β β - Fast queries β β
β ββββββββββ¬ββββββββ ββββββββββ¬βββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββ βββββββββββββββββββ β
β β Write DB β β Read DB β β
β β (Normalized) β β (Denormalized) β β
β ββββββββββββββββββ βββββββββββββββββββ β
β β β² β
β ββββββββββ Events βββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββ
Benefits:
- Optimized models: Write model for consistency, read model for speed
- Independent scaling: Scale reads and writes separately
- Simpler code: Each operation has single purpose
- Better performance: Reads don't lock writes
- Flexibility: Different databases for reads and writes
ποΈ Commands: Write OperationsΒΆ
Commands represent intentions to change state:
from dataclasses import dataclass
from neuroglia.mediation import Command, OperationResult
@dataclass
class PlaceOrderCommand(Command[OperationResult[OrderDto]]):
"""
Command: Imperative name (verb).
Expresses intention to change state.
"""
customer_id: str
items: List[OrderItemDto]
delivery_address: DeliveryAddressDto
@dataclass
class ConfirmOrderCommand(Command[OperationResult]):
"""Command to confirm an order."""
order_id: str
@dataclass
class CancelOrderCommand(Command[OperationResult]):
"""Command to cancel an order."""
order_id: str
reason: str
Command Characteristics:
- Imperative names:
PlaceOrder,ConfirmOrder,CancelOrder(actions) - Write operations: Change system state
- Can fail: Validation, business rules
- Return results: Success/failure, errors
- Single purpose: Do one thing
ποΈ Queries: Read OperationsΒΆ
Queries represent requests for data:
from dataclasses import dataclass
from neuroglia.mediation import Query
@dataclass
class GetOrderByIdQuery(Query[OrderDto]):
"""
Query: Question-like name.
Requests data without changing state.
"""
order_id: str
@dataclass
class GetCustomerOrdersQuery(Query[List[OrderDto]]):
"""Query to get customer's orders."""
customer_id: str
status: Optional[OrderStatus] = None
@dataclass
class GetOrderStatisticsQuery(Query[OrderStatistics]):
"""Query for order statistics."""
date_from: datetime
date_to: datetime
Query Characteristics:
- Question names:
GetOrderById,GetCustomerOrders(questions) - Read-only: Don't change state
- Never fail: Return empty/null if not found
- Return data: DTOs, lists, aggregates
- Can be cached: Since they don't change state
π§ CQRS in NeurogliaΒΆ
Command HandlersΒΆ
Handle write operations:
from neuroglia.mediation import CommandHandler, OperationResult
from neuroglia.mapping import Mapper
class PlaceOrderCommandHandler(CommandHandler[PlaceOrderCommand, OperationResult[OrderDto]]):
"""Handles PlaceOrderCommand - write operation."""
def __init__(self,
repository: IOrderRepository,
mapper: Mapper):
self.repository = repository
self.mapper = mapper
async def handle_async(self, command: PlaceOrderCommand) -> OperationResult[OrderDto]:
"""
Command handler:
1. Validate
2. Create domain entity
3. Apply business rules
4. Persist
5. Return result
"""
# 1. Validate
if not command.items:
return self.bad_request("Order must have at least one item")
# 2. Create domain entity
order = Order(command.customer_id)
# 3. Apply business rules (through domain model)
for item_dto in command.items:
order.add_item(
item_dto.pizza_name,
item_dto.size,
item_dto.quantity,
item_dto.price
)
order.set_delivery_address(self.mapper.map(
command.delivery_address,
DeliveryAddress
))
# 4. Persist
await self.repository.save_async(order)
# 5. Return result
order_dto = self.mapper.map(order, OrderDto)
return self.created(order_dto)
Query HandlersΒΆ
Handle read operations:
from neuroglia.mediation import QueryHandler
class GetOrderByIdQueryHandler(QueryHandler[GetOrderByIdQuery, OrderDto]):
"""Handles GetOrderByIdQuery - read operation."""
def __init__(self,
repository: IOrderRepository,
mapper: Mapper):
self.repository = repository
self.mapper = mapper
async def handle_async(self, query: GetOrderByIdQuery) -> Optional[OrderDto]:
"""
Query handler:
1. Retrieve data
2. Transform to DTO
3. Return (don't validate, don't modify)
"""
# 1. Retrieve
order = await self.repository.get_by_id_async(query.order_id)
if not order:
return None
# 2. Transform
return self.mapper.map(order, OrderDto)
class GetCustomerOrdersQueryHandler(QueryHandler[GetCustomerOrdersQuery, List[OrderDto]]):
"""Handles GetCustomerOrdersQuery - read operation."""
def __init__(self,
repository: IOrderRepository,
mapper: Mapper):
self.repository = repository
self.mapper = mapper
async def handle_async(self, query: GetCustomerOrdersQuery) -> List[OrderDto]:
"""Optimized read - may use denormalized read model."""
# Query optimized read model (not domain model!)
orders = await self.repository.find_by_customer_async(
query.customer_id,
status=query.status
)
return [self.mapper.map(o, OrderDto) for o in orders]
Using MediatorΒΆ
Mediator dispatches commands and queries to handlers:
from neuroglia.mediation import Mediator
class OrdersController:
def __init__(self, mediator: Mediator):
self.mediator = mediator
@post("/orders")
async def create_order(self, dto: CreateOrderDto) -> OrderDto:
"""Write operation - use command."""
command = PlaceOrderCommand(
customer_id=dto.customer_id,
items=dto.items,
delivery_address=dto.delivery_address
)
result = await self.mediator.execute_async(command)
return self.process(result) # Returns 201 Created
@get("/orders/{order_id}")
async def get_order(self, order_id: str) -> OrderDto:
"""Read operation - use query."""
query = GetOrderByIdQuery(order_id=order_id)
result = await self.mediator.execute_async(query)
if not result:
raise HTTPException(status_code=404, detail="Order not found")
return result # Returns 200 OK
@get("/customers/{customer_id}/orders")
async def get_customer_orders(self, customer_id: str) -> List[OrderDto]:
"""Read operation - use query."""
query = GetCustomerOrdersQuery(customer_id=customer_id)
return await self.mediator.execute_async(query)
π Advanced: Separate Read and Write ModelsΒΆ
For high-scale systems, use different databases:
# Write Model: Domain aggregate (normalized, consistent)
class Order(AggregateRoot):
"""Write model - rich domain model."""
def __init__(self, customer_id: str):
super().__init__()
self.customer_id = customer_id
self.items: List[OrderItem] = []
self.status = OrderStatus.PENDING
def add_item(self, pizza_name: str, ...):
# Business logic, validation
pass
# Write Repository: Saves domain aggregates
class OrderWriteRepository:
"""Saves to write database (normalized)."""
async def save_async(self, order: Order):
await self.mongo_collection.insert_one({
"id": order.id,
"customer_id": order.customer_id,
"items": [item.to_dict() for item in order.items],
"status": order.status.value
})
# Read Model: Flat DTO (denormalized, fast)
@dataclass
class OrderReadModel:
"""Read model - optimized for queries."""
order_id: str
customer_id: str
customer_name: str # Denormalized from Customer
customer_email: str # Denormalized from Customer
total: Decimal
item_count: int
status: str
created_at: datetime
# Flattened, no joins needed!
# Read Repository: Queries read model
class OrderReadRepository:
"""Queries from read database (denormalized)."""
async def get_by_id_async(self, order_id: str) -> OrderReadModel:
# Query denormalized view - very fast!
doc = await self.read_collection.find_one({"order_id": order_id})
return OrderReadModel(**doc)
# Synchronize via events
class OrderConfirmedHandler:
"""Updates read model when write model changes."""
async def handle(self, event: OrderConfirmedEvent):
# Update read model
await self.read_repo.update({
"order_id": event.order_id,
"status": "confirmed",
"confirmed_at": datetime.utcnow()
})
π§ͺ Testing CQRSΒΆ
Test Command HandlersΒΆ
async def test_place_order_command():
"""Test write operation."""
# Arrange
mock_repo = Mock(spec=IOrderRepository)
handler = PlaceOrderCommandHandler(mock_repo, mapper)
command = PlaceOrderCommand(
customer_id="123",
items=[OrderItemDto("Margherita", PizzaSize.LARGE, 1, Decimal("15.99"))],
delivery_address=DeliveryAddressDto("123 Main St", "City", "12345")
)
# Act
result = await handler.handle_async(command)
# Assert
assert result.is_success
assert result.status_code == 201
mock_repo.save_async.assert_called_once()
async def test_place_order_validation():
"""Test command validation."""
handler = PlaceOrderCommandHandler(mock_repo, mapper)
command = PlaceOrderCommand(
customer_id="123",
items=[], # Invalid: no items
delivery_address=DeliveryAddressDto("123 Main St", "City", "12345")
)
result = await handler.handle_async(command)
assert not result.is_success
assert result.status_code == 400
assert "at least one item" in result.error_message
Test Query HandlersΒΆ
async def test_get_order_query():
"""Test read operation."""
# Arrange
mock_repo = Mock(spec=IOrderRepository)
mock_repo.get_by_id_async.return_value = create_test_order()
handler = GetOrderByIdQueryHandler(mock_repo, mapper)
query = GetOrderByIdQuery(order_id="123")
# Act
result = await handler.handle_async(query)
# Assert
assert result is not None
assert result.order_id == "123"
mock_repo.get_by_id_async.assert_called_once_with("123")
async def test_get_order_not_found():
"""Test query with no result."""
mock_repo = Mock(spec=IOrderRepository)
mock_repo.get_by_id_async.return_value = None
handler = GetOrderByIdQueryHandler(mock_repo, mapper)
query = GetOrderByIdQuery(order_id="999")
result = await handler.handle_async(query)
assert result is None # Query returns None, doesn't raise
β οΈ Common MistakesΒΆ
1. Queries that Modify StateΒΆ
# β WRONG: Query modifies state
class GetOrderByIdQueryHandler(QueryHandler):
async def handle_async(self, query):
order = await self.repository.get_by_id_async(query.order_id)
order.last_viewed = datetime.utcnow() # NO! Modifying state in query!
await self.repository.save_async(order)
return order
# β
RIGHT: Queries are read-only
class GetOrderByIdQueryHandler(QueryHandler):
async def handle_async(self, query):
order = await self.repository.get_by_id_async(query.order_id)
return self.mapper.map(order, OrderDto) # Read-only
2. Commands that Return DataΒΆ
# β WRONG: Command returns full entity
class PlaceOrderCommand(Command[Order]): # Returns entity
pass
# β
RIGHT: Command returns result/DTO
class PlaceOrderCommand(Command[OperationResult[OrderDto]]): # Returns DTO
pass
3. Business Logic in Query HandlersΒΆ
# β WRONG: Validation in query
class GetOrderQueryHandler(QueryHandler):
async def handle_async(self, query):
if not query.order_id:
raise ValueError("Order ID required") # Query shouldn't validate!
return await self.repository.get_by_id_async(query.order_id)
# β
RIGHT: Validation in command only
class ConfirmOrderCommandHandler(CommandHandler):
async def handle_async(self, command):
if not command.order_id:
return self.bad_request("Order ID required") # Validation in command
# ...
π« When NOT to Use CQRSΒΆ
CQRS adds complexity. Skip when:
- Simple CRUD: Basic create/read/update/delete
- Low Scale: Single-server application
- No Specialized Reads: Reads and writes have same needs
- Prototypes: Quick experiments
- Small Team: Learning curve not worth it
For simple apps, traditional layered architecture works fine.
π Key TakeawaysΒΆ
- Separation: Commands write, queries read
- Optimization: Each side optimized for its purpose
- Scalability: Scale reads and writes independently
- Clarity: Single responsibility per operation
- Flexibility: Different models, databases possible
π CQRS + Other PatternsΒΆ
Command β Command Handler β Domain Model β Write DB
β
Event
β
Event Handler β Read Model β Read DB
β
Query β Query Handler βββββββββββββββββββββββββββββββ
π Next StepsΒΆ
- Implement it: Tutorial Part 3 builds CQRS
- Dispatch requests: Mediator Pattern routes commands/queries
- Handle events: Event-Driven Architecture synchronizes models
π Further ReadingΒΆ
- Greg Young's CQRS Documents
- Martin Fowler's CQRS
- Microsoft's CQRS Pattern
Previous: β Aggregates & Entities | Next: Mediator Pattern β