Part 6: Persistence & RepositoriesΒΆ
Time: 45 minutes | Prerequisites: Part 5
In this tutorial, you'll implement data persistence using the Repository pattern with MongoDB. You'll learn how to abstract data access and maintain clean separation between domain and infrastructure.
π― What You'll LearnΒΆ
- The Repository pattern and why it matters
- MongoDB integration with Motor (async driver)
- Implementing repositories for aggregates
- Data persistence with repository pattern and automatic event publishing
- Testing data access layers
πΎ Understanding the Repository PatternΒΆ
The ProblemΒΆ
Without repositories, domain logic is polluted with database code:
# β Domain entity knows about MongoDB
class Order(AggregateRoot):
async def save(self):
collection = mongo_client.db.orders
await collection.insert_one(self.__dict__) # π±
Problems:
- Domain depends on infrastructure (MongoDB)
- Can't test without database
- Can't swap database implementations
- Violates clean architecture
The Solution: Repository PatternΒΆ
Repositories abstract data access behind interfaces:
ββββββββββββββββ
β Domain β Uses interface
β (Order) ββββββββββββ
ββββββββββββββββ β
βΌ
ββββββββββββββββββ
β IOrderRepo β (interface)
β - get() β
β - add() β
β - list() β
ββββββββββ¬ββββββββ
β
βββββββββββββββββββΌββββββββββββββββββ
βΌ βΌ βΌ
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
βMongoOrderRepoβ β FileRepo β β InMemoryRepo β
β β β β β β
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
Benefits:
- Testability: Use in-memory repo for tests
- Flexibility: Swap implementations without changing domain
- Clean Architecture: Domain doesn't depend on infrastructure
- Consistency: Standard interface for all data access
π Defining Repository InterfacesΒΆ
Step 1: Create Repository InterfaceΒΆ
Create domain/repositories/__init__.py:
"""Repository interfaces for domain entities"""
from abc import ABC, abstractmethod
from typing import List, Optional
from domain.entities import Order
class IOrderRepository(ABC):
"""
Interface for Order persistence.
Domain defines the contract, infrastructure implements it.
"""
@abstractmethod
async def get_async(self, order_id: str) -> Optional[Order]:
"""Get order by ID"""
pass
@abstractmethod
async def add_async(self, order: Order) -> None:
"""Add new order"""
pass
@abstractmethod
async def update_async(self, order: Order) -> None:
"""Update existing order"""
pass
@abstractmethod
async def delete_async(self, order_id: str) -> None:
"""Delete order"""
pass
@abstractmethod
async def list_async(self) -> List[Order]:
"""Get all orders"""
pass
@abstractmethod
async def find_by_status_async(self, status: str) -> List[Order]:
"""Find orders by status"""
pass
Key points:
- Interface only: No implementation details
- Domain types: Works with
Orderentities, not dicts/documents - Async: All methods async for non-blocking I/O
- Business queries:
find_by_status_asyncreflects business needs
ποΈ MongoDB ImplementationΒΆ
Step 1: Install MotorΒΆ
Motor is the async MongoDB driver:
Step 2: Implement MongoDB RepositoryΒΆ
Create integration/repositories/mongo_order_repository.py:
"""MongoDB implementation of IOrderRepository"""
from typing import List, Optional
from motor.motor_asyncio import AsyncIOMotorCollection
from domain.entities import Order, OrderStatus
from domain.repositories import IOrderRepository
from neuroglia.data.infrastructure.mongo import MotorRepository
class MongoOrderRepository(
MotorRepository[Order, str],
IOrderRepository
):
"""
MongoDB implementation of order repository.
MotorRepository provides:
- Automatic serialization/deserialization
- CRUD operations
- Query helpers
"""
def __init__(self, collection: AsyncIOMotorCollection):
"""
Initialize with MongoDB collection.
Collection is injected by DI container.
"""
super().__init__(collection, Order, str)
async def find_by_status_async(
self,
status: str
) -> List[Order]:
"""Find orders by status"""
# Convert status string to enum
order_status = OrderStatus(status.lower())
# Query MongoDB
cursor = self.collection.find({"state.status": order_status.value})
# Deserialize to Order entities
orders = []
async for doc in cursor:
order = await self._deserialize_async(doc)
orders.append(order)
return orders
async def find_active_orders_async(self) -> List[Order]:
"""Find orders that are not delivered or cancelled"""
active_statuses = [
OrderStatus.PENDING.value,
OrderStatus.CONFIRMED.value,
OrderStatus.COOKING.value,
OrderStatus.READY.value,
OrderStatus.DELIVERING.value,
]
cursor = self.collection.find({
"state.status": {"$in": active_statuses}
})
orders = []
async for doc in cursor:
order = await self._deserialize_async(doc)
orders.append(order)
return orders
What MotorRepository provides:
get_async(id): Get by IDadd_async(entity): Insert new entityupdate_async(entity): Update existing entitydelete_async(id): Delete by IDlist_async(): Get all entities- Automatic serialization using JsonSerializer
- Automatic deserialization to domain entities
Step 3: Configure MongoDB ConnectionΒΆ
In main.py:
from neuroglia.data.infrastructure.mongo import MotorRepository
from domain.entities import Order, Customer, Pizza
from domain.repositories import (
IOrderRepository,
ICustomerRepository,
IPizzaRepository
)
from integration.repositories import (
MongoOrderRepository,
MongoCustomerRepository,
MongoPizzaRepository
)
def create_pizzeria_app():
builder = WebApplicationBuilder()
# ... other configuration ...
# Configure MongoDB repositories
MotorRepository.configure(
builder,
entity_type=Order,
key_type=str,
database_name="mario_pizzeria",
collection_name="orders"
)
MotorRepository.configure(
builder,
entity_type=Customer,
key_type=str,
database_name="mario_pizzeria",
collection_name="customers"
)
# Register repository implementations
builder.services.add_scoped(IOrderRepository, MongoOrderRepository)
builder.services.add_scoped(ICustomerRepository, MongoCustomerRepository)
return builder.build_app_with_lifespan(...)
Configuration does:
- Creates MongoDB connection pool
- Sets up collection access
- Registers serialization/deserialization
- Binds interface to implementation in DI
Step 4: Environment ConfigurationΒΆ
Create .env file:
# MongoDB Configuration
MONGODB_URI=mongodb://localhost:27017
MONGODB_DATABASE=mario_pizzeria
# Or for MongoDB Atlas
# MONGODB_URI=mongodb+srv://user:pass@cluster.mongodb.net/
# Application Settings
LOG_LEVEL=INFO
Load in application/settings.py:
"""Application settings"""
from pydantic_settings import BaseSettings
class AppSettings(BaseSettings):
"""Application configuration"""
# MongoDB
mongodb_uri: str = "mongodb://localhost:27017"
mongodb_database: str = "mario_pizzeria"
# Logging
log_level: str = "INFO"
class Config:
env_file = ".env"
# Singleton instance
app_settings = AppSettings()
π Transaction Management with Repository PatternΒΆ
The Command Handler serves as the transaction boundary, and the Repository coordinates persistence with automatic event publishing.
How Repository-Based Transactions WorkΒΆ
# In command handler
async def handle_async(self, command: PlaceOrderCommand):
# 1οΈβ£ Create order (in memory)
order = Order(customer_id=command.customer_id)
order.add_order_item(item)
order.confirm_order() # Raises OrderConfirmedEvent internally
# 2οΈβ£ Save changes via repository (transaction boundary)
await self.order_repository.add_async(order)
# Repository does:
# - Saves order state to database
# - Gets uncommitted events from order
# - Publishes events to event bus
# - Clears uncommitted events from order
# - All in a transactional scope!
Benefits:
- Atomic: State changes and event publishing succeed or fail together
- Event consistency: Events only published if database save succeeds
- Automatic: No manual event publishing needed
- Simple: Command handler IS the transaction boundary
Configure RepositoriesΒΆ
In main.py:
from neuroglia.data.infrastructure.mongo import MongoRepository
from domain.repositories import IOrderRepository
# Configure repositories with automatic event publishing
services.add_scoped(IOrderRepository, MongoOrderRepository)
# Repository automatically handles:
# - State persistence
# - Event publishing
# - Transaction coordination
## π§ͺ Testing Repositories
### Option 1: In-Memory Repository (Unit Tests)
Create `tests/fixtures/in_memory_order_repository.py`:
```python
"""In-memory repository for testing"""
from typing import Dict, List, Optional
from domain.entities import Order, OrderStatus
from domain.repositories import IOrderRepository
class InMemoryOrderRepository(IOrderRepository):
"""In-memory implementation for testing"""
def __init__(self):
self._orders: Dict[str, Order] = {}
async def get_async(self, order_id: str) -> Optional[Order]:
return self._orders.get(order_id)
async def add_async(self, order: Order) -> None:
self._orders[order.id()] = order
async def update_async(self, order: Order) -> None:
self._orders[order.id()] = order
async def delete_async(self, order_id: str) -> None:
self._orders.pop(order_id, None)
async def list_async(self) -> List[Order]:
return list(self._orders.values())
async def find_by_status_async(self, status: str) -> List[Order]:
order_status = OrderStatus(status.lower())
return [
o for o in self._orders.values()
if o.state.status == order_status
]
Use in tests:
@pytest.fixture
def order_repository():
return InMemoryOrderRepository()
@pytest.mark.asyncio
async def test_place_order_handler(order_repository):
"""Test handler with in-memory repository"""
handler = PlaceOrderHandler(
order_repository=order_repository,
# ... other mocks
)
command = PlaceOrderCommand(...)
result = await handler.handle_async(command)
assert result.is_success
# Verify order was saved
orders = await order_repository.list_async()
assert len(orders) == 1
Option 2: Integration Tests with MongoDBΒΆ
Create tests/integration/test_mongo_order_repository.py:
"""Integration tests for MongoDB repository"""
import pytest
from motor.motor_asyncio import AsyncIOMotorClient
from domain.entities import Order, OrderItem, PizzaSize
from integration.repositories import MongoOrderRepository
from decimal import Decimal
@pytest.fixture
async def mongo_client():
"""Create test MongoDB client"""
client = AsyncIOMotorClient("mongodb://localhost:27017")
yield client
# Cleanup
await client.mario_pizzeria_test.orders.delete_many({})
client.close()
@pytest.fixture
async def order_repository(mongo_client):
"""Create repository with test collection"""
collection = mongo_client.mario_pizzeria_test.orders
return MongoOrderRepository(collection)
@pytest.mark.asyncio
@pytest.mark.integration
async def test_crud_operations(order_repository):
"""Test complete CRUD workflow"""
# Create
order = Order(customer_id="cust-123")
item = OrderItem.create(
name="Margherita",
size=PizzaSize.LARGE,
quantity=1,
unit_price=Decimal("12.99")
)
order.add_order_item(item)
order.confirm_order()
await order_repository.add_async(order)
# Read
retrieved = await order_repository.get_async(order.id())
assert retrieved is not None
assert retrieved.state.customer_id == "cust-123"
assert retrieved.pizza_count == 1
# Update
retrieved.start_cooking(user_id="chef-1", user_name="Mario")
await order_repository.update_async(retrieved)
# Verify update
updated = await order_repository.get_async(order.id())
assert updated.state.status == OrderStatus.COOKING
# Delete
await order_repository.delete_async(order.id())
deleted = await order_repository.get_async(order.id())
assert deleted is None
Run integration tests:
# Start MongoDB
docker run -d -p 27017:27017 mongo:latest
# Run tests
poetry run pytest tests/integration/ -m integration -v
π Key TakeawaysΒΆ
- Repository Pattern: Abstracts data access behind interfaces
- Clean Architecture: Domain doesn't depend on infrastructure
- Motor: Async MongoDB driver for Python
- MotorRepository: Framework base class with CRUD operations
- Repository Pattern: Handles persistence and automatic event publishing
- Testing: Use in-memory repos for unit tests, real DB for integration tests
π What's Next?ΒΆ
In Part 7: Authentication & Authorization, you'll learn:
- OAuth2 and JWT authentication
- Keycloak integration
- Role-based access control (RBAC)
- Protecting API endpoints
Previous: β Part 5: Events & Integration | Next: Part 7: Authentication & Authorization β