Skip to content

Part 6: Persistence & RepositoriesΒΆ

Time: 45 minutes | Prerequisites: Part 5

In this tutorial, you'll implement data persistence using the Repository pattern with MongoDB. You'll learn how to abstract data access and maintain clean separation between domain and infrastructure.

🎯 What You'll Learn¢

  • The Repository pattern and why it matters
  • MongoDB integration with Motor (async driver)
  • Implementing repositories for aggregates
  • Data persistence with repository pattern and automatic event publishing
  • Testing data access layers

πŸ’Ύ Understanding the Repository PatternΒΆ

The ProblemΒΆ

Without repositories, domain logic is polluted with database code:

# ❌ Domain entity knows about MongoDB
class Order(AggregateRoot):
    async def save(self):
        collection = mongo_client.db.orders
        await collection.insert_one(self.__dict__)  # 😱

Problems:

  • Domain depends on infrastructure (MongoDB)
  • Can't test without database
  • Can't swap database implementations
  • Violates clean architecture

The Solution: Repository PatternΒΆ

Repositories abstract data access behind interfaces:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Domain     β”‚  Uses interface
β”‚   (Order)    │──────────┐
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
                          β–Ό
                 β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                 β”‚  IOrderRepo    β”‚  (interface)
                 β”‚  - get()       β”‚
                 β”‚  - add()       β”‚
                 β”‚  - list()      β”‚
                 β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                          β”‚
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        β–Ό                 β–Ό                 β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚MongoOrderRepoβ”‚  β”‚  FileRepo    β”‚  β”‚ InMemoryRepo β”‚
β”‚              β”‚  β”‚              β”‚  β”‚              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Benefits:

  • Testability: Use in-memory repo for tests
  • Flexibility: Swap implementations without changing domain
  • Clean Architecture: Domain doesn't depend on infrastructure
  • Consistency: Standard interface for all data access

πŸ“ Defining Repository InterfacesΒΆ

Step 1: Create Repository InterfaceΒΆ

Create domain/repositories/__init__.py:

"""Repository interfaces for domain entities"""
from abc import ABC, abstractmethod
from typing import List, Optional

from domain.entities import Order


class IOrderRepository(ABC):
    """
    Interface for Order persistence.

    Domain defines the contract, infrastructure implements it.
    """

    @abstractmethod
    async def get_async(self, order_id: str) -> Optional[Order]:
        """Get order by ID"""
        pass

    @abstractmethod
    async def add_async(self, order: Order) -> None:
        """Add new order"""
        pass

    @abstractmethod
    async def update_async(self, order: Order) -> None:
        """Update existing order"""
        pass

    @abstractmethod
    async def delete_async(self, order_id: str) -> None:
        """Delete order"""
        pass

    @abstractmethod
    async def list_async(self) -> List[Order]:
        """Get all orders"""
        pass

    @abstractmethod
    async def find_by_status_async(self, status: str) -> List[Order]:
        """Find orders by status"""
        pass

Key points:

  • Interface only: No implementation details
  • Domain types: Works with Order entities, not dicts/documents
  • Async: All methods async for non-blocking I/O
  • Business queries: find_by_status_async reflects business needs

πŸ—„οΈ MongoDB ImplementationΒΆ

Step 1: Install MotorΒΆ

Motor is the async MongoDB driver:

poetry add motor pymongo

Step 2: Implement MongoDB RepositoryΒΆ

Create integration/repositories/mongo_order_repository.py:

"""MongoDB implementation of IOrderRepository"""
from typing import List, Optional

from motor.motor_asyncio import AsyncIOMotorCollection

from domain.entities import Order, OrderStatus
from domain.repositories import IOrderRepository
from neuroglia.data.infrastructure.mongo import MotorRepository


class MongoOrderRepository(
    MotorRepository[Order, str],
    IOrderRepository
):
    """
    MongoDB implementation of order repository.

    MotorRepository provides:
    - Automatic serialization/deserialization
    - CRUD operations
    - Query helpers
    """

    def __init__(self, collection: AsyncIOMotorCollection):
        """
        Initialize with MongoDB collection.

        Collection is injected by DI container.
        """
        super().__init__(collection, Order, str)

    async def find_by_status_async(
        self,
        status: str
    ) -> List[Order]:
        """Find orders by status"""
        # Convert status string to enum
        order_status = OrderStatus(status.lower())

        # Query MongoDB
        cursor = self.collection.find({"state.status": order_status.value})

        # Deserialize to Order entities
        orders = []
        async for doc in cursor:
            order = await self._deserialize_async(doc)
            orders.append(order)

        return orders

    async def find_active_orders_async(self) -> List[Order]:
        """Find orders that are not delivered or cancelled"""
        active_statuses = [
            OrderStatus.PENDING.value,
            OrderStatus.CONFIRMED.value,
            OrderStatus.COOKING.value,
            OrderStatus.READY.value,
            OrderStatus.DELIVERING.value,
        ]

        cursor = self.collection.find({
            "state.status": {"$in": active_statuses}
        })

        orders = []
        async for doc in cursor:
            order = await self._deserialize_async(doc)
            orders.append(order)

        return orders

What MotorRepository provides:

  • get_async(id): Get by ID
  • add_async(entity): Insert new entity
  • update_async(entity): Update existing entity
  • delete_async(id): Delete by ID
  • list_async(): Get all entities
  • Automatic serialization using JsonSerializer
  • Automatic deserialization to domain entities

Step 3: Configure MongoDB ConnectionΒΆ

In main.py:

from neuroglia.data.infrastructure.mongo import MotorRepository
from domain.entities import Order, Customer, Pizza
from domain.repositories import (
    IOrderRepository,
    ICustomerRepository,
    IPizzaRepository
)
from integration.repositories import (
    MongoOrderRepository,
    MongoCustomerRepository,
    MongoPizzaRepository
)

def create_pizzeria_app():
    builder = WebApplicationBuilder()

    # ... other configuration ...

    # Configure MongoDB repositories
    MotorRepository.configure(
        builder,
        entity_type=Order,
        key_type=str,
        database_name="mario_pizzeria",
        collection_name="orders"
    )

    MotorRepository.configure(
        builder,
        entity_type=Customer,
        key_type=str,
        database_name="mario_pizzeria",
        collection_name="customers"
    )

    # Register repository implementations
    builder.services.add_scoped(IOrderRepository, MongoOrderRepository)
    builder.services.add_scoped(ICustomerRepository, MongoCustomerRepository)

    return builder.build_app_with_lifespan(...)

Configuration does:

  1. Creates MongoDB connection pool
  2. Sets up collection access
  3. Registers serialization/deserialization
  4. Binds interface to implementation in DI

Step 4: Environment ConfigurationΒΆ

Create .env file:

# MongoDB Configuration
MONGODB_URI=mongodb://localhost:27017
MONGODB_DATABASE=mario_pizzeria

# Or for MongoDB Atlas
# MONGODB_URI=mongodb+srv://user:pass@cluster.mongodb.net/

# Application Settings
LOG_LEVEL=INFO

Load in application/settings.py:

"""Application settings"""
from pydantic_settings import BaseSettings


class AppSettings(BaseSettings):
    """Application configuration"""

    # MongoDB
    mongodb_uri: str = "mongodb://localhost:27017"
    mongodb_database: str = "mario_pizzeria"

    # Logging
    log_level: str = "INFO"

    class Config:
        env_file = ".env"


# Singleton instance
app_settings = AppSettings()

πŸ”„ Transaction Management with Repository PatternΒΆ

The Command Handler serves as the transaction boundary, and the Repository coordinates persistence with automatic event publishing.

How Repository-Based Transactions WorkΒΆ

# In command handler
async def handle_async(self, command: PlaceOrderCommand):
    # 1️⃣ Create order (in memory)
    order = Order(customer_id=command.customer_id)
    order.add_order_item(item)
    order.confirm_order()  # Raises OrderConfirmedEvent internally

    # 2️⃣ Save changes via repository (transaction boundary)
    await self.order_repository.add_async(order)

    # Repository does:
    # - Saves order state to database
    # - Gets uncommitted events from order
    # - Publishes events to event bus
    # - Clears uncommitted events from order
    # - All in a transactional scope!

Benefits:

  • Atomic: State changes and event publishing succeed or fail together
  • Event consistency: Events only published if database save succeeds
  • Automatic: No manual event publishing needed
  • Simple: Command handler IS the transaction boundary

Configure RepositoriesΒΆ

In main.py:

from neuroglia.data.infrastructure.mongo import MongoRepository
from domain.repositories import IOrderRepository

# Configure repositories with automatic event publishing
services.add_scoped(IOrderRepository, MongoOrderRepository)

# Repository automatically handles:
# - State persistence
# - Event publishing
# - Transaction coordination
## πŸ§ͺ Testing Repositories

### Option 1: In-Memory Repository (Unit Tests)

Create `tests/fixtures/in_memory_order_repository.py`:

```python
"""In-memory repository for testing"""
from typing import Dict, List, Optional

from domain.entities import Order, OrderStatus
from domain.repositories import IOrderRepository


class InMemoryOrderRepository(IOrderRepository):
    """In-memory implementation for testing"""

    def __init__(self):
        self._orders: Dict[str, Order] = {}

    async def get_async(self, order_id: str) -> Optional[Order]:
        return self._orders.get(order_id)

    async def add_async(self, order: Order) -> None:
        self._orders[order.id()] = order

    async def update_async(self, order: Order) -> None:
        self._orders[order.id()] = order

    async def delete_async(self, order_id: str) -> None:
        self._orders.pop(order_id, None)

    async def list_async(self) -> List[Order]:
        return list(self._orders.values())

    async def find_by_status_async(self, status: str) -> List[Order]:
        order_status = OrderStatus(status.lower())
        return [
            o for o in self._orders.values()
            if o.state.status == order_status
        ]

Use in tests:

@pytest.fixture
def order_repository():
    return InMemoryOrderRepository()


@pytest.mark.asyncio
async def test_place_order_handler(order_repository):
    """Test handler with in-memory repository"""
    handler = PlaceOrderHandler(
        order_repository=order_repository,
        # ... other mocks
    )

    command = PlaceOrderCommand(...)
    result = await handler.handle_async(command)

    assert result.is_success

    # Verify order was saved
    orders = await order_repository.list_async()
    assert len(orders) == 1

Option 2: Integration Tests with MongoDBΒΆ

Create tests/integration/test_mongo_order_repository.py:

"""Integration tests for MongoDB repository"""
import pytest
from motor.motor_asyncio import AsyncIOMotorClient

from domain.entities import Order, OrderItem, PizzaSize
from integration.repositories import MongoOrderRepository
from decimal import Decimal


@pytest.fixture
async def mongo_client():
    """Create test MongoDB client"""
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    yield client

    # Cleanup
    await client.mario_pizzeria_test.orders.delete_many({})
    client.close()


@pytest.fixture
async def order_repository(mongo_client):
    """Create repository with test collection"""
    collection = mongo_client.mario_pizzeria_test.orders
    return MongoOrderRepository(collection)


@pytest.mark.asyncio
@pytest.mark.integration
async def test_crud_operations(order_repository):
    """Test complete CRUD workflow"""
    # Create
    order = Order(customer_id="cust-123")
    item = OrderItem.create(
        name="Margherita",
        size=PizzaSize.LARGE,
        quantity=1,
        unit_price=Decimal("12.99")
    )
    order.add_order_item(item)
    order.confirm_order()

    await order_repository.add_async(order)

    # Read
    retrieved = await order_repository.get_async(order.id())
    assert retrieved is not None
    assert retrieved.state.customer_id == "cust-123"
    assert retrieved.pizza_count == 1

    # Update
    retrieved.start_cooking(user_id="chef-1", user_name="Mario")
    await order_repository.update_async(retrieved)

    # Verify update
    updated = await order_repository.get_async(order.id())
    assert updated.state.status == OrderStatus.COOKING

    # Delete
    await order_repository.delete_async(order.id())
    deleted = await order_repository.get_async(order.id())
    assert deleted is None

Run integration tests:

# Start MongoDB
docker run -d -p 27017:27017 mongo:latest

# Run tests
poetry run pytest tests/integration/ -m integration -v

πŸ“ Key TakeawaysΒΆ

  1. Repository Pattern: Abstracts data access behind interfaces
  2. Clean Architecture: Domain doesn't depend on infrastructure
  3. Motor: Async MongoDB driver for Python
  4. MotorRepository: Framework base class with CRUD operations
  5. Repository Pattern: Handles persistence and automatic event publishing
  6. Testing: Use in-memory repos for unit tests, real DB for integration tests

πŸš€ What's Next?ΒΆ

In Part 7: Authentication & Authorization, you'll learn:

  • OAuth2 and JWT authentication
  • Keycloak integration
  • Role-based access control (RBAC)
  • Protecting API endpoints

Previous: ← Part 5: Events & Integration | Next: Part 7: Authentication & Authorization β†’