Data Layer Architecture¶
This document explains the data layer architecture of the Starter App, covering domain abstractions, repository patterns, and how they integrate with Domain-Driven Design (DDD) principles using the Neuroglia framework.
Overview¶
The data layer in this application follows a clean architecture approach with clear separation between:
- Domain Layer - Abstract interfaces and entity definitions (what)
- Integration Layer - Concrete implementations (how)
- Infrastructure - Technical adapters and persistence mechanisms
graph TB
subgraph "Application Layer"
CH[Command Handlers]
QH[Query Handlers]
end
subgraph "Domain Layer (Abstractions)"
AR[AggregateRoot<br/>Task]
AS[AggregateState<br/>TaskState]
DE[DomainEvents]
RI[Repository Interface<br/>TaskRepository]
end
subgraph "Integration Layer (Implementations)"
MR[MongoTaskRepository]
IMR[InMemoryTaskRepository]
end
subgraph "Infrastructure"
DB[(MongoDB)]
MEM[Memory Store]
end
CH --> RI
QH --> RI
AR --> AS
AR --> DE
RI -.implements.-> MR
RI -.implements.-> IMR
MR --> DB
IMR --> MEM
style AR fill:#e1f5ff
style AS fill:#e1f5ff
style DE fill:#e1f5ff
style RI fill:#fff3cd
Domain-Driven Design Concepts¶
AggregateRoot: Behavior Container¶
The AggregateRoot is the entry point for all domain operations. It encapsulates business logic and ensures consistency within the aggregate boundary.
Key Characteristics:
- ✅ Contains business methods that enforce invariants
- ✅ Generates domain events for state changes
- ✅ Maintains aggregate boundary and identity
- ✅ Delegates state storage to
AggregateState
Example: Task Aggregate
from neuroglia.data.abstractions import AggregateRoot
class Task(AggregateRoot[TaskState, str]):
"""Task aggregate root - encapsulates task behavior."""
def __init__(
self,
title: str,
description: str,
status: TaskStatus = TaskStatus.PENDING,
priority: TaskPriority = TaskPriority.MEDIUM,
# ... other parameters
) -> None:
super().__init__()
aggregate_id = task_id or str(uuid4())
# Register domain event for creation
self.state.on(
self.register_event(
TaskCreatedDomainEvent(
aggregate_id=aggregate_id,
title=title,
description=description,
status=status,
priority=priority,
# ...
)
)
)
def update_title(self, new_title: str) -> None:
"""Business method to update title."""
if not new_title or not new_title.strip():
raise ValueError("Title cannot be empty")
# Register event
event = TaskTitleUpdatedDomainEvent(
aggregate_id=self.id(),
new_title=new_title
)
# Apply state change through event
self.state.on(self.register_event(event))
def update_status(self, new_status: TaskStatus) -> None:
"""Business method to update status."""
# Enforce business rules
if self.state.status == TaskStatus.COMPLETED:
raise ValueError("Cannot modify completed tasks")
event = TaskStatusUpdatedDomainEvent(
aggregate_id=self.id(),
new_status=new_status
)
self.state.on(self.register_event(event))
Why This Matters:
- Business rules are centralized in the aggregate
- State changes always go through domain events
- Aggregate maintains consistency of its data
- External code cannot bypass business logic
AggregateState: Attribute Storage¶
The AggregateState stores the data attributes and applies events to update state. It's purely about data, not behavior.
Key Characteristics:
- ✅ Holds all persisted attributes
- ✅ Implements event handlers (
onmethods) to apply events - ✅ No business logic - pure data container
- ✅ Supports event sourcing through event replay
Example: TaskState
from neuroglia.data.abstractions import AggregateState
from multipledispatch import dispatch
class TaskState(AggregateState[str]):
"""Encapsulates the persisted state for the Task aggregate."""
# All persisted attributes
id: str
title: str
description: str
status: TaskStatus
priority: TaskPriority
assignee_id: Optional[str]
department: Optional[str]
created_at: datetime
updated_at: datetime
created_by: Optional[str]
def __init__(self) -> None:
super().__init__()
# Initialize with defaults
self.id = ""
self.title = ""
self.description = ""
self.status = TaskStatus.PENDING
self.priority = TaskPriority.MEDIUM
# ...
@dispatch(TaskCreatedDomainEvent)
def on(self, event: TaskCreatedDomainEvent) -> None:
"""Apply creation event to state."""
self.id = event.aggregate_id
self.title = event.title
self.description = event.description
self.status = event.status
self.priority = event.priority
# ...
@dispatch(TaskTitleUpdatedDomainEvent)
def on(self, event: TaskTitleUpdatedDomainEvent) -> None:
"""Apply title update event to state."""
self.title = event.new_title
self.updated_at = datetime.now(timezone.utc)
@dispatch(TaskStatusUpdatedDomainEvent)
def on(self, event: TaskStatusUpdatedDomainEvent) -> None:
"""Apply status update event to state."""
self.status = event.new_status
self.updated_at = datetime.now(timezone.utc)
Benefits:
- Clean separation between behavior (AggregateRoot) and data (AggregateState)
- Easy to serialize/deserialize for persistence
- Supports event sourcing by replaying events
- Type-safe attribute access through Python type hints
DomainEvents: Decoupling Side Effects¶
Domain events represent facts about what happened in the domain. They decouple the aggregate from side effects like notifications, integrations, or updating read models.
Key Characteristics:
- ✅ Immutable records of what happened
- ✅ Named in past tense (TaskCreated, TitleUpdated)
- ✅ Contain all data needed to process the event
- ✅ Published automatically by repositories after persistence
Example: Domain Events
from neuroglia.data.abstractions import DomainEvent
from neuroglia.eventing.cloud_events.decorators import cloudevent
@cloudevent("task.created.v1")
@dataclass
class TaskCreatedDomainEvent(DomainEvent):
"""Event raised when a new task is created."""
aggregate_id: str
title: str
description: str
status: TaskStatus
priority: TaskPriority
assignee_id: Optional[str]
department: Optional[str]
created_at: datetime
updated_at: datetime
created_by: Optional[str]
def __init__(self, aggregate_id: str, title: str, ...):
super().__init__(aggregate_id)
self.aggregate_id = aggregate_id
self.title = title
# ...
@cloudevent("task.title.updated.v1")
@dataclass
class TaskTitleUpdatedDomainEvent(DomainEvent):
"""Event raised when task title is updated."""
aggregate_id: str
new_title: str
def __init__(self, aggregate_id: str, new_title: str):
super().__init__(aggregate_id)
self.aggregate_id = aggregate_id
self.new_title = new_title
Event Flow:
1. Aggregate method called → register_event()
2. Event applied to state → state.on(event)
3. Aggregate persisted → repository.add_async() or update_async()
4. Repository publishes events → mediator.publish()
5. Event handlers react → Send notification, update cache, etc.
Benefits:
- Loose coupling between aggregates and side effects
- Audit trail of all domain changes
- Easy to add new event handlers without modifying aggregates
- Supports event-driven architecture and CQRS
- CloudEvent format enables cross-platform integration
Repository Pattern¶
Abstract Repository Interface¶
The repository interface defines the contract for data access without exposing implementation details.
Location: src/domain/repositories/task_repository.py
from abc import ABC, abstractmethod
from domain.entities import Task
class TaskRepository(ABC):
"""Abstract repository for Task entities."""
@abstractmethod
async def get_all_async(self) -> list[Task]:
"""Retrieve all tasks."""
pass
@abstractmethod
async def get_by_id_async(self, task_id: str) -> Task | None:
"""Retrieve a task by ID."""
pass
@abstractmethod
async def get_by_assignee_async(self, assignee_id: str) -> list[Task]:
"""Retrieve tasks assigned to a specific user."""
pass
@abstractmethod
async def get_by_department_async(self, department: str) -> list[Task]:
"""Retrieve tasks for a specific department."""
pass
@abstractmethod
async def add_async(self, entity: Task) -> Task:
"""Add a new task."""
pass
@abstractmethod
async def update_async(self, entity: Task) -> Task:
"""Update an existing task."""
pass
@abstractmethod
async def delete_async(self, task_id: str, task: Task | None = None) -> bool:
"""Delete a task by ID."""
pass
Design Principles:
- ✅ Technology-agnostic (no MongoDB/SQL specifics)
- ✅ Focused on domain operations (not CRUD)
- ✅ Async methods for non-blocking I/O
- ✅ Returns domain entities, not DTOs or dicts
- ✅ Lives in domain layer (pure abstraction)
Concrete Implementations¶
In-Memory Repository (Testing)¶
Location: src/integration/repositories/in_memory_task_repository.py
from domain.entities import Task
from domain.repositories import TaskRepository
class InMemoryTaskRepository(TaskRepository):
"""In-memory implementation for testing."""
def __init__(self) -> None:
self._tasks: dict[str, Task] = {}
async def get_all_async(self) -> list[Task]:
return list(self._tasks.values())
async def get_by_id_async(self, task_id: str) -> Task | None:
return self._tasks.get(task_id)
async def get_by_assignee_async(self, assignee_id: str) -> list[Task]:
return [
task for task in self._tasks.values()
if task.state.assignee_id == assignee_id
]
async def get_by_department_async(self, department: str) -> list[Task]:
return [
task for task in self._tasks.values()
if task.state.department == department
]
async def add_async(self, entity: Task) -> Task:
self._tasks[entity.id()] = entity
return entity
async def update_async(self, entity: Task) -> Task:
self._tasks[entity.id()] = entity
return entity
async def delete_async(self, task_id: str, task: Task | None = None) -> bool:
if task_id in self._tasks:
del self._tasks[task_id]
return True
return False
Use Cases:
- ✅ Unit testing without external dependencies
- ✅ Fast test execution
- ✅ Deterministic test behavior
- ✅ Local development without Docker
MongoDB Repository (Production)¶
Location: src/integration/repositories/motor_task_repository.py
from motor.motor_asyncio import AsyncIOMotorClient
from neuroglia.data.infrastructure.mongo import MotorRepository
from neuroglia.data.infrastructure.tracing_mixin import TracedRepositoryMixin
from domain.entities import Task
from domain.repositories import TaskRepository
class MongoTaskRepository(
TracedRepositoryMixin,
MotorRepository[Task, str],
TaskRepository
):
"""MongoDB repository with automatic event publishing and tracing."""
def __init__(
self,
client: AsyncIOMotorClient,
database_name: str,
collection_name: str,
serializer: JsonSerializer,
entity_type: Optional[type[Task]] = None,
mediator: Optional["Mediator"] = None,
):
super().__init__(
client=client,
database_name=database_name,
collection_name=collection_name,
serializer=serializer,
entity_type=entity_type,
mediator=mediator,
)
# Inherits from MotorRepository:
# - get_async(id) → automatic event publishing
# - add_async(entity) → automatic event publishing
# - update_async(entity) → automatic event publishing
# - remove_async(id)
async def get_all_async(self) -> list[Task]:
"""Custom query: retrieve all tasks."""
cursor = self.collection.find({})
tasks = []
async for document in cursor:
task = self._deserialize_entity(document)
tasks.append(task)
return tasks
async def get_by_id_async(self, task_id: str) -> Task | None:
"""Delegates to inherited get_async."""
return await self.get_async(task_id)
async def get_by_assignee_async(self, assignee_id: str) -> list[Task]:
"""Custom query: find by assignee."""
cursor = self.collection.find({"assignee_id": assignee_id})
tasks = []
async for document in cursor:
task = self._deserialize_entity(document)
tasks.append(task)
return tasks
async def get_by_department_async(self, department: str) -> list[Task]:
"""Custom query: find by department."""
cursor = self.collection.find({"department": department})
tasks = []
async for document in cursor:
task = self._deserialize_entity(document)
tasks.append(task)
return tasks
async def delete_async(self, task_id: str, task: Task | None = None) -> bool:
"""Custom delete with optional event publishing."""
try:
entity = task if task else await self.get_async(task_id)
if entity:
# Publish domain events (including TaskDeletedDomainEvent)
await self._publish_domain_events(entity)
# Physical deletion
await self.remove_async(task_id)
return True
except Exception:
return False
Key Features:
- ✅ Automatic Event Publishing: MotorRepository publishes domain events after persistence
- ✅ Tracing: TracedRepositoryMixin adds OpenTelemetry spans automatically
- ✅ Inheritance: Reuses CRUD operations from base repository
- ✅ Custom Queries: Adds domain-specific query methods
- ✅ Type Safety: Full type hints for entities and IDs
Creating New Domain Entities¶
Step 1: Define Domain Events¶
# src/domain/events/order.py
from dataclasses import dataclass
from datetime import datetime
from neuroglia.data.abstractions import DomainEvent
from neuroglia.eventing.cloud_events.decorators import cloudevent
@cloudevent("order.created.v1")
@dataclass
class OrderCreatedDomainEvent(DomainEvent):
"""Event raised when an order is created."""
aggregate_id: str
customer_id: str
total_amount: float
items: list[dict]
created_at: datetime
def __init__(self, aggregate_id: str, customer_id: str, total_amount: float, items: list[dict], created_at: datetime):
super().__init__(aggregate_id)
self.aggregate_id = aggregate_id
self.customer_id = customer_id
self.total_amount = total_amount
self.items = items
self.created_at = created_at
@cloudevent("order.confirmed.v1")
@dataclass
class OrderConfirmedDomainEvent(DomainEvent):
"""Event raised when an order is confirmed."""
aggregate_id: str
confirmed_at: datetime
def __init__(self, aggregate_id: str, confirmed_at: datetime):
super().__init__(aggregate_id)
self.aggregate_id = aggregate_id
self.confirmed_at = confirmed_at
Step 2: Create AggregateState¶
# src/domain/entities/order.py
from datetime import datetime, timezone
from typing import Optional
from multipledispatch import dispatch
from neuroglia.data.abstractions import AggregateState
from domain.events.order import OrderCreatedDomainEvent, OrderConfirmedDomainEvent
class OrderState(AggregateState[str]):
"""Persisted state for Order aggregate."""
id: str
customer_id: str
total_amount: float
items: list[dict]
status: str # "pending", "confirmed", "shipped", "delivered"
created_at: datetime
confirmed_at: Optional[datetime]
def __init__(self) -> None:
super().__init__()
self.id = ""
self.customer_id = ""
self.total_amount = 0.0
self.items = []
self.status = "pending"
self.created_at = datetime.now(timezone.utc)
self.confirmed_at = None
@dispatch(OrderCreatedDomainEvent)
def on(self, event: OrderCreatedDomainEvent) -> None:
"""Apply creation event."""
self.id = event.aggregate_id
self.customer_id = event.customer_id
self.total_amount = event.total_amount
self.items = event.items
self.status = "pending"
self.created_at = event.created_at
@dispatch(OrderConfirmedDomainEvent)
def on(self, event: OrderConfirmedDomainEvent) -> None:
"""Apply confirmation event."""
self.status = "confirmed"
self.confirmed_at = event.confirmed_at
Step 3: Create AggregateRoot¶
# src/domain/entities/order.py (continued)
from uuid import uuid4
from neuroglia.data.abstractions import AggregateRoot
class Order(AggregateRoot[OrderState, str]):
"""Order aggregate root."""
def __init__(
self,
customer_id: str,
items: list[dict],
order_id: Optional[str] = None,
) -> None:
super().__init__()
# Calculate total
total_amount = sum(item["price"] * item["quantity"] for item in items)
# Validate business rules
if total_amount <= 0:
raise ValueError("Order total must be positive")
if not items:
raise ValueError("Order must contain items")
aggregate_id = order_id or str(uuid4())
# Register creation event
self.state.on(
self.register_event(
OrderCreatedDomainEvent(
aggregate_id=aggregate_id,
customer_id=customer_id,
total_amount=total_amount,
items=items,
created_at=datetime.now(timezone.utc),
)
)
)
def id(self) -> str:
"""Return aggregate identifier."""
return self.state.id
def confirm(self) -> None:
"""Business method to confirm order."""
# Enforce business rules
if self.state.status != "pending":
raise ValueError("Can only confirm pending orders")
if not self.state.items:
raise ValueError("Cannot confirm empty order")
# Register event
event = OrderConfirmedDomainEvent(
aggregate_id=self.id(),
confirmed_at=datetime.now(timezone.utc)
)
self.state.on(self.register_event(event))
Step 4: Define Repository Interface¶
# src/domain/repositories/order_repository.py
from abc import ABC, abstractmethod
from domain.entities import Order
class OrderRepository(ABC):
"""Abstract repository for Order entities."""
@abstractmethod
async def get_by_id_async(self, order_id: str) -> Order | None:
"""Retrieve order by ID."""
pass
@abstractmethod
async def get_by_customer_async(self, customer_id: str) -> list[Order]:
"""Retrieve orders for a customer."""
pass
@abstractmethod
async def get_pending_orders_async(self) -> list[Order]:
"""Retrieve all pending orders."""
pass
@abstractmethod
async def add_async(self, entity: Order) -> Order:
"""Add new order."""
pass
@abstractmethod
async def update_async(self, entity: Order) -> Order:
"""Update existing order."""
pass
Step 5: Implement Repository¶
# src/integration/repositories/mongo_order_repository.py
from motor.motor_asyncio import AsyncIOMotorClient
from neuroglia.data.infrastructure.mongo import MotorRepository
from neuroglia.data.infrastructure.tracing_mixin import TracedRepositoryMixin
from neuroglia.serialization.json import JsonSerializer
from domain.entities import Order
from domain.repositories import OrderRepository
class MongoOrderRepository(
TracedRepositoryMixin,
MotorRepository[Order, str],
OrderRepository
):
"""MongoDB implementation of OrderRepository."""
def __init__(
self,
client: AsyncIOMotorClient,
database_name: str,
collection_name: str,
serializer: JsonSerializer,
entity_type: Optional[type[Order]] = None,
mediator: Optional["Mediator"] = None,
):
super().__init__(
client=client,
database_name=database_name,
collection_name=collection_name,
serializer=serializer,
entity_type=entity_type,
mediator=mediator,
)
async def get_by_id_async(self, order_id: str) -> Order | None:
"""Retrieve order by ID."""
return await self.get_async(order_id)
async def get_by_customer_async(self, customer_id: str) -> list[Order]:
"""Retrieve orders for a customer."""
cursor = self.collection.find({"customer_id": customer_id})
orders = []
async for document in cursor:
order = self._deserialize_entity(document)
orders.append(order)
return orders
async def get_pending_orders_async(self) -> list[Order]:
"""Retrieve pending orders."""
cursor = self.collection.find({"status": "pending"})
orders = []
async for document in cursor:
order = self._deserialize_entity(document)
orders.append(order)
return orders
Step 6: Register with Dependency Injection¶
# src/main.py or appropriate configuration location
from neuroglia.dependency_injection import ServiceCollection
from domain.repositories import OrderRepository
from integration.repositories import MongoOrderRepository
# Register repository
services.add_singleton(
OrderRepository,
lambda sp: MongoOrderRepository(
client=sp.get_required_service(AsyncIOMotorClient),
database_name="starter_app",
collection_name="orders",
serializer=sp.get_required_service(JsonSerializer),
entity_type=Order,
mediator=sp.get_required_service(Mediator),
)
)
Best Practices¶
1. Aggregate Design¶
✅ DO:
- Keep aggregates small and focused
- Enforce invariants within aggregate boundaries
- Generate events for all state changes
- Use value objects for complex attributes
❌ DON'T:
- Load multiple aggregates in one transaction
- Modify aggregates directly from outside
- Skip event generation for state changes
- Put business logic in state classes
2. Domain Events¶
✅ DO:
- Name events in past tense (OrderCreated, not CreateOrder)
- Include all data needed by event handlers
- Use immutable dataclasses
- Version events with CloudEvent type (v1, v2)
❌ DON'T:
- Mutate events after creation
- Reference other aggregates in events
- Include behavior in event classes
- Skip event versioning
3. Repository Pattern¶
✅ DO:
- Define interfaces in domain layer
- Implement in integration layer
- Use async methods for I/O
- Return domain entities, not dictionaries
❌ DON'T:
- Leak infrastructure details into domain
- Mix query and command operations
- Return DTOs from repositories
- Bypass repository to access database directly
4. State Management¶
✅ DO:
- Apply all state changes through events
- Use
@dispatchdecorators for event handlers - Initialize state with sensible defaults
- Keep state classes focused on data
❌ DON'T:
- Modify state directly from aggregate
- Add business logic to state classes
- Skip event handlers for state changes
- Use mutable collections without copying
Testing Strategies¶
Unit Testing Aggregates¶
def test_order_confirmation():
# Arrange
order = Order(
customer_id="customer-1",
items=[{"name": "Widget", "price": 10.0, "quantity": 2}]
)
# Act
order.confirm()
# Assert
assert order.state.status == "confirmed"
assert order.state.confirmed_at is not None
assert len(order.pending_events) == 2 # Created + Confirmed
def test_order_validation():
# Assert business rule enforcement
with pytest.raises(ValueError, match="Order total must be positive"):
Order(customer_id="customer-1", items=[])
Integration Testing Repositories¶
@pytest.mark.asyncio
async def test_order_persistence(order_repository: OrderRepository):
# Arrange
order = Order(
customer_id="customer-1",
items=[{"name": "Widget", "price": 10.0, "quantity": 2}]
)
# Act
saved_order = await order_repository.add_async(order)
retrieved_order = await order_repository.get_by_id_async(saved_order.id())
# Assert
assert retrieved_order is not None
assert retrieved_order.state.customer_id == "customer-1"
assert retrieved_order.state.total_amount == 20.0
Troubleshooting¶
Common Issues¶
Problem: Events not being published
Solution: Ensure repository has mediator injected and inherits from MotorRepository
# ✅ Correct
class MongoOrderRepository(MotorRepository[Order, str], OrderRepository):
def __init__(self, ..., mediator: Mediator):
super().__init__(..., mediator=mediator)
# ❌ Missing mediator
class MongoOrderRepository(OrderRepository):
def __init__(self, ...): # No mediator parameter
pass
Problem: State not updating after event
Solution: Ensure @dispatch decorator is used and event is registered
# ✅ Correct
@dispatch(OrderConfirmedDomainEvent)
def on(self, event: OrderConfirmedDomainEvent) -> None:
self.status = "confirmed"
# Also in aggregate:
event = OrderConfirmedDomainEvent(...)
self.state.on(self.register_event(event)) # register_event is key!
Problem: Type checker warnings with MotorRepository
Solution: This is a known limitation with generic variance. Use type: ignore or cast:
async def get_by_id_async(self, order_id: str) -> Order | None:
return cast(Order | None, await self.get_async(order_id))
Related Documentation¶
- CQRS Pattern - Command and query separation
- Dependency Injection - DI patterns and setup
- Architecture Overview - High-level architecture concepts
- Testing Guide - Testing strategies and examples
Additional Resources¶
Neuroglia Framework¶
- Neuroglia Documentation - Framework documentation
- AggregateRoot Pattern - Aggregate pattern details
- Repository Pattern - Repository implementation guide
Domain-Driven Design¶
- DDD Aggregates - Martin Fowler on aggregates
- Event Sourcing - Event sourcing pattern
- Repository Pattern - Classic pattern description