Aggregates & EntitiesΒΆ
Time to read: 12 minutes
Aggregates are clusters of domain objects treated as a single unit for data changes. They're the key to maintaining consistency in complex domain models.
β The Problem: Inconsistent StateΒΆ
Without aggregates, related objects can become inconsistent:
# β No aggregate - objects can be inconsistent
order = Order(id="123")
order.status = "confirmed"
# Someone modifies items after confirmation
order.items.append(OrderItem("Pepperoni", 1)) # Breaks business rule!
# Someone changes delivery without validation
order.delivery_address = None # Confirmed order with no address!
# Total out of sync with items
order.total = 50.0 # Items actually total $75!
Problems:
- No consistency: Related objects can contradict each other
- No invariants: Business rules not enforced
- No boundaries: Anyone can modify anything
- Race conditions: Concurrent changes cause conflicts
- Hard to reason about: Too many moving parts
β The Solution: Aggregate PatternΒΆ
Group related objects with one root controlling access:
ββββββββββββββββββββββββββββββββββββββββββββββ
β Order Aggregate (Consistency Boundary) β
β β
β ββββββββββββββββββββββββββββββββββββββββ β
β β Order (Aggregate Root) β β
β β - id, customer_id, status, created β β
β β - add_item(), confirm(), cancel() β β
β ββββββββββββββββββββββββββββββββββββββββ β
β β β
β βββ OrderItem (value object) β
β βββ OrderItem (value object) β
β βββ DeliveryAddress (value obj) β
β β
ββββββββββββββββββββββββββββββββββββββββββββββ
Rules:
1. External code ONLY accesses Order (root)
2. Order enforces ALL consistency rules
3. Order and items saved/loaded as ONE UNIT
4. Changes happen through Order methods only
Benefits:
- Guaranteed consistency: Root enforces invariants
- Clear boundaries: One entry point
- Transactional: Aggregate saved as a unit
- Concurrency control: Lock at aggregate level
- Easy to reason about: Complexity contained
ποΈ Anatomy of an AggregateΒΆ
Aggregate RootΒΆ
The single entry point for the aggregate:
class Order: # AGGREGATE ROOT
"""
Order aggregate root.
- External code accesses order through this class
- Order controls all its child objects
- Order enforces business invariants
"""
def __init__(self, customer_id: str):
self.id = str(uuid.uuid4())
self.customer_id = customer_id
self._items: List[OrderItem] = [] # Private!
self._status = OrderStatus.PENDING
self._delivery_address: Optional[DeliveryAddress] = None
# β
Public methods enforce rules
def add_item(self, pizza_name: str, size: PizzaSize, quantity: int, price: Decimal):
"""Add item through root - rules enforced!"""
if self._status != OrderStatus.PENDING:
raise InvalidOperationError("Cannot modify confirmed orders")
item = OrderItem(pizza_name, size, quantity, price)
self._items.append(item)
def set_delivery_address(self, address: DeliveryAddress):
"""Set address through root - validation enforced!"""
if not address.street or not address.city:
raise ValueError("Complete address required")
self._delivery_address = address
def confirm(self):
"""Confirm order - invariants checked!"""
if not self._items:
raise InvalidOperationError("Cannot confirm empty order")
if not self._delivery_address:
raise InvalidOperationError("Delivery address required")
if self.total() < Decimal("10"):
raise InvalidOperationError("Minimum order is $10")
self._status = OrderStatus.CONFIRMED
def total(self) -> Decimal:
"""Calculate total - always consistent with items!"""
return sum(item.subtotal() for item in self._items)
# Read-only access to children
@property
def items(self) -> List[OrderItem]:
return self._items.copy() # Return copy, not reference!
@property
def status(self) -> OrderStatus:
return self._status
Child ObjectsΒΆ
Internal to aggregate, accessed through root:
@dataclass(frozen=True) # Immutable value object
class OrderItem:
"""Child object of Order aggregate."""
pizza_name: str
size: PizzaSize
quantity: int
price: Decimal
def subtotal(self) -> Decimal:
return self.price * self.quantity
@dataclass(frozen=True)
class DeliveryAddress:
"""Child object of Order aggregate."""
street: str
city: str
zip_code: str
delivery_instructions: Optional[str] = None
π§ Aggregates in NeurogliaΒΆ
Using Entity and AggregateRootΒΆ
Neuroglia provides base classes:
from neuroglia.core import Entity, AggregateRoot
class Order(AggregateRoot):
"""
AggregateRoot provides:
- Unique ID generation
- Domain event collection
- Event raising/retrieval
"""
def __init__(self, customer_id: str):
super().__init__() # Generates ID, initializes events
self.customer_id = customer_id
self.items: List[OrderItem] = []
self.status = OrderStatus.PENDING
# Raise domain event
self.raise_event(OrderCreatedEvent(
order_id=self.id,
customer_id=customer_id
))
def add_item(self, pizza_name: str, size: PizzaSize, quantity: int, price: Decimal):
# Validation
if self.status != OrderStatus.PENDING:
raise InvalidOperationError("Cannot modify confirmed orders")
# Create value object
item = OrderItem(pizza_name, size, quantity, price)
self.items.append(item)
# Raise domain event
self.raise_event(ItemAddedToOrderEvent(
order_id=self.id,
pizza_name=pizza_name,
quantity=quantity
))
def confirm(self):
# Business rules
if not self.items:
raise InvalidOperationError("Cannot confirm empty order")
if self.total() < Decimal("10"):
raise InvalidOperationError("Minimum order is $10")
# State change
self.status = OrderStatus.CONFIRMED
# Raise domain event
self.raise_event(OrderConfirmedEvent(
order_id=self.id,
customer_id=self.customer_id,
total=self.total()
))
def total(self) -> Decimal:
return sum(item.subtotal() for item in self.items)
# Get uncommitted events (for persistence)
def get_uncommitted_events(self) -> List[DomainEvent]:
return self._uncommitted_events.copy()
Aggregate BoundariesΒΆ
Rule 1: One Aggregate Root per Aggregate
# β
CORRECT: Order is the only root
class Order(AggregateRoot):
def __init__(self):
self.items: List[OrderItem] = [] # Child objects
# β WRONG: Making child an aggregate root too
class OrderItem(AggregateRoot): # Don't do this!
pass
Rule 2: Reference Other Aggregates by ID
# β
CORRECT: Reference Customer by ID
class Order(AggregateRoot):
def __init__(self, customer_id: str):
self.customer_id = customer_id # ID reference
# β WRONG: Embedding entire Customer aggregate
class Order(AggregateRoot):
def __init__(self, customer: Customer):
self.customer = customer # Entire object - NO!
Rule 3: Small Aggregates
# β
GOOD: Small, focused aggregate
class Order(AggregateRoot):
def __init__(self):
self.items: List[OrderItem] = [] # 1-10 items typical
self.delivery_address: DeliveryAddress = None
# β BAD: Huge aggregate
class Order(AggregateRoot):
def __init__(self):
self.customer: Customer = None # Entire customer!
self.items: List[OrderItem] = []
self.payments: List[Payment] = [] # Separate aggregate
self.shipments: List[Shipment] = [] # Separate aggregate
self.reviews: List[Review] = [] # Separate aggregate
# Too big! Contention, performance issues
Event Sourcing with AggregatesΒΆ
Neuroglia supports event-sourced aggregates:
from neuroglia.core import AggregateState
class OrderState(AggregateState):
"""
State rebuilt from events.
Each event handler updates state.
"""
def __init__(self):
super().__init__()
self.customer_id: Optional[str] = None
self.items: List[OrderItem] = []
self.status = OrderStatus.PENDING
# Event handlers rebuild state
def on_order_created(self, event: OrderCreatedEvent):
self.customer_id = event.customer_id
self.status = OrderStatus.PENDING
def on_item_added(self, event: ItemAddedToOrderEvent):
item = OrderItem(
event.pizza_name,
event.size,
event.quantity,
event.price
)
self.items.append(item)
def on_order_confirmed(self, event: OrderConfirmedEvent):
self.status = OrderStatus.CONFIRMED
class Order(AggregateRoot):
"""Event-sourced aggregate."""
def __init__(self, state: OrderState):
super().__init__()
self.state = state
def add_item(self, pizza_name: str, size: PizzaSize, quantity: int, price: Decimal):
# Validate against current state
if self.state.status != OrderStatus.PENDING:
raise InvalidOperationError("Cannot modify confirmed orders")
# Apply event (updates state + records event)
self.apply_event(ItemAddedToOrderEvent(
order_id=self.id,
pizza_name=pizza_name,
size=size,
quantity=quantity,
price=price
))
π§ͺ Testing AggregatesΒΆ
Test InvariantsΒΆ
def test_cannot_add_item_to_confirmed_order():
"""Test aggregate enforces consistency."""
order = Order(customer_id="123")
order.add_item("Margherita", PizzaSize.LARGE, 1, Decimal("15.99"))
order.confirm()
# Attempt to violate invariant
with pytest.raises(InvalidOperationError, match="confirmed orders"):
order.add_item("Pepperoni", PizzaSize.MEDIUM, 1, Decimal("13.99"))
def test_cannot_confirm_order_without_items():
"""Test aggregate enforces business rules."""
order = Order(customer_id="123")
with pytest.raises(InvalidOperationError, match="empty order"):
order.confirm()
def test_order_total_always_consistent():
"""Test calculated fields always match items."""
order = Order(customer_id="123")
order.add_item("Margherita", PizzaSize.LARGE, 2, Decimal("15.99"))
order.add_item("Pepperoni", PizzaSize.MEDIUM, 1, Decimal("13.99"))
# Total should always match items
expected = (Decimal("15.99") * 2) + Decimal("13.99")
assert order.total() == expected
Test Domain EventsΒΆ
def test_aggregate_raises_events():
"""Test domain events are raised."""
order = Order(customer_id="123")
order.add_item("Margherita", PizzaSize.LARGE, 1, Decimal("15.99"))
order.confirm()
events = order.get_uncommitted_events()
assert len(events) == 3 # Created, ItemAdded, Confirmed
assert isinstance(events[0], OrderCreatedEvent)
assert isinstance(events[1], ItemAddedToOrderEvent)
assert isinstance(events[2], OrderConfirmedEvent)
β οΈ Common MistakesΒΆ
1. Aggregates Too LargeΒΆ
# β WRONG: Everything in one aggregate
class Customer(AggregateRoot):
def __init__(self):
self.orders: List[Order] = [] # All orders!
self.payments: List[Payment] = [] # All payments!
self.reviews: List[Review] = [] # All reviews!
# Problem: Loading customer loads EVERYTHING
# β
RIGHT: Separate aggregates
class Customer(AggregateRoot):
def __init__(self):
self.name = ""
self.email = ""
# Orders are separate aggregates
class Order(AggregateRoot):
def __init__(self):
self.customer_id = "" # Reference by ID
2. Public Mutable CollectionsΒΆ
# β WRONG: Direct access to mutable list
class Order(AggregateRoot):
def __init__(self):
self.items: List[OrderItem] = [] # Public!
order.items.append(OrderItem(...)) # Bypasses validation!
# β
RIGHT: Private collection, controlled access
class Order(AggregateRoot):
def __init__(self):
self._items: List[OrderItem] = [] # Private
def add_item(self, item: OrderItem):
# Validation here
self._items.append(item)
@property
def items(self) -> List[OrderItem]:
return self._items.copy() # Return copy
3. Violating Aggregate BoundariesΒΆ
# β WRONG: Modifying another aggregate's internals
order = order_repository.get(order_id)
customer = customer_repository.get(order.customer_id)
customer.orders.append(order) # Modifying Customer from Order!
# β
RIGHT: Each aggregate modifies itself
order = order_repository.get(order_id)
order.confirm() # Order modifies itself
# Customer reacts via event
class OrderConfirmedHandler:
async def handle(self, event: OrderConfirmedEvent):
customer = await self.customer_repo.get(event.customer_id)
customer.record_order(event.order_id) # Customer modifies itself
4. Loading Multiple Aggregates in One TransactionΒΆ
# β WRONG: Modifying two aggregates in one transaction
async def transfer_order(order_id: str, new_customer_id: str):
order = await order_repo.get(order_id)
old_customer = await customer_repo.get(order.customer_id)
new_customer = await customer_repo.get(new_customer_id)
old_customer.remove_order(order_id)
new_customer.add_order(order_id)
order.customer_id = new_customer_id
await order_repo.save(order)
await customer_repo.save(old_customer)
await customer_repo.save(new_customer)
# Problem: What if one save fails?
# β
RIGHT: Use eventual consistency via events
async def transfer_order(order_id: str, new_customer_id: str):
order = await order_repo.get(order_id)
order.transfer_to_customer(new_customer_id) # Raises event
await order_repo.save(order)
# Customers update via event handlers (eventually consistent)
π« When NOT to Use AggregatesΒΆ
Aggregates add complexity. Skip when:
- Simple CRUD: No business rules, just data entry
- Reporting: Read-only queries don't need aggregates
- No Invariants: If there are no consistency rules
- Single Entity: If entity has no relationships
- Prototypes: Quick experiments
For simple cases, plain entities work fine.
π Key TakeawaysΒΆ
- Aggregate Root: Single entry point for aggregate
- Consistency Boundary: Aggregate maintains invariants
- Transactional Unit: Save/load aggregate as one unit
- Small Aggregates: Keep aggregates focused and small
- ID References: Reference other aggregates by ID, not object
π Aggregates + Other PatternsΒΆ
Aggregate Root (Entity)
β uses
Value Objects (immutable children)
β raises
Domain Events (state changes)
β persisted by
Repository (loads/saves aggregate)
β coordinated by
Unit of Work (transaction boundary)
π Next StepsΒΆ
- See it implemented: Tutorial Part 2 builds Order aggregate
- Understand persistence: Repository Pattern for saving aggregates
- Event handling: Event-Driven Architecture for aggregate events
π Further ReadingΒΆ
- Vaughn Vernon's "Implementing Domain-Driven Design" (Chapter 10)
- Martin Fowler's "DDD Aggregate"
- Effective Aggregate Design (Vernon)
Previous: β Domain-Driven Design | Next: CQRS β