Skip to content

🏦 OpenBank Sample Application¢

OpenBank is a comprehensive sample application that demonstrates advanced Neuroglia features including event sourcing, CQRS, domain-driven design, and event-driven architecture. It simulates a simple banking system with persons and accounts.

🎯 Overview¢

The OpenBank sample showcases:

  • Event Sourcing: Complete event-sourced domain with event store
  • CQRS: Separate command and query models
  • Domain-Driven Design: Rich domain models with business rules
  • Event-Driven Architecture: Domain events and integration events
  • Clean Architecture: Clear separation of layers
  • Repository Pattern: Both write (event sourcing) and read (MongoDB) repositories

πŸ—οΈ ArchitectureΒΆ

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    API Layer                                    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  PersonsController β”‚  β”‚ AccountsController β”‚  β”‚  Other APIs  β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                              β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Application Layer                            β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚    Commands     β”‚  β”‚     Queries     β”‚  β”‚     Events     β”‚   β”‚
β”‚  β”‚   Handlers      β”‚  β”‚    Handlers     β”‚  β”‚   Handlers     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                              β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Domain Layer                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚     Person      β”‚  β”‚     Account     β”‚  β”‚    Address     β”‚   β”‚
β”‚  β”‚   Aggregate     β”‚  β”‚   Aggregate     β”‚  β”‚ Value Object   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                              β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Integration Layer                              β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Event Store     β”‚  β”‚   MongoDB       β”‚  β”‚  API Clients   β”‚   β”‚
β”‚  β”‚ Repository      β”‚  β”‚  Repository     β”‚  β”‚                β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸš€ Getting StartedΒΆ

PrerequisitesΒΆ

  • Python 3.11+
  • Docker and Docker Compose
  • MongoDB (via Docker)
  • EventStoreDB (via Docker)

SetupΒΆ

  1. Start Dependencies:
cd samples/openbank
docker-compose up -d mongodb eventstoredb
  1. Install Dependencies:
pip install -r requirements.txt
  1. Run the Application:
python api/main.py
  1. Access the API:

  2. API Documentation: http://localhost:8000/api/docs

  3. EventStoreDB UI: http://localhost:2113 (admin/changeit)

πŸ“ Project StructureΒΆ

samples/openbank/
β”œβ”€β”€ api/
β”‚   β”œβ”€β”€ main.py                     # Application entry point
β”‚   └── controllers/
β”‚       β”œβ”€β”€ persons_controller.py   # Person management API
β”‚       └── accounts_controller.py  # Account management API
β”œβ”€β”€ application/
β”‚   β”œβ”€β”€ commands/
β”‚   β”‚   β”œβ”€β”€ persons/
β”‚   β”‚   β”‚   └── register_person_command.py
β”‚   β”‚   └── accounts/
β”‚   β”‚       β”œβ”€β”€ open_account_command.py
β”‚   β”‚       └── deposit_command.py
β”‚   β”œβ”€β”€ queries/
β”‚   β”‚   β”œβ”€β”€ person_by_id.py
β”‚   β”‚   └── account_by_owner.py
β”‚   └── events/
β”‚       β”œβ”€β”€ integration/
β”‚       β”‚   └── person_registered_handler.py
β”‚       └── domain/
β”œβ”€β”€ domain/
β”‚   └── models/
β”‚       β”œβ”€β”€ person.py               # Person aggregate
β”‚       β”œβ”€β”€ account.py              # Account aggregate
β”‚       └── address.py              # Address value object
└── integration/
    β”œβ”€β”€ models/                     # DTOs and read models
    β”‚   β”œβ”€β”€ person.py
    β”‚   └── account.py
    └── commands/                   # API command DTOs
        └── persons/
            └── register_person_command_dto.py

πŸ›οΈ Domain ModelsΒΆ

Person AggregateΒΆ

The Person aggregate manages person registration and personal information:

from dataclasses import dataclass
from datetime import date
from neuroglia.data.abstractions import AggregateRoot
from samples.openbank.integration import PersonGender

@dataclass
class PersonState:
    """Person aggregate state"""
    id: str = None
    first_name: str = None
    last_name: str = None
    nationality: str = None
    gender: PersonGender = None
    date_of_birth: date = None
    address: Address = None

class Person(AggregateRoot[str]):
    """Person aggregate root"""

    def __init__(self, id: str = None):
        super().__init__(id)
        self.state = PersonState()

    def register(self, first_name: str, last_name: str, nationality: str, 
                gender: PersonGender, date_of_birth: date, address: Address):
        """Register a new person"""

        # Validate business rules
        if not first_name or not last_name:
            raise ValueError("First name and last name are required")

        if date_of_birth >= date.today():
            raise ValueError("Date of birth must be in the past")

        # Raise domain event
        self.apply(PersonRegisteredEvent(
            person_id=self.id,
            first_name=first_name,
            last_name=last_name,
            nationality=nationality,
            gender=gender,
            date_of_birth=date_of_birth,
            address=address
        ))

    def update_address(self, new_address: Address):
        """Update person's address"""
        self.apply(PersonAddressUpdatedEvent(
            person_id=self.id,
            old_address=self.state.address,
            new_address=new_address
        ))

    # Event handlers
    def on_person_registered(self, event: PersonRegisteredEvent):
        """Handle person registered event"""
        self.state.id = event.person_id
        self.state.first_name = event.first_name
        self.state.last_name = event.last_name
        self.state.nationality = event.nationality
        self.state.gender = event.gender
        self.state.date_of_birth = event.date_of_birth
        self.state.address = event.address

    def on_person_address_updated(self, event: PersonAddressUpdatedEvent):
        """Handle address updated event"""
        self.state.address = event.new_address

Account AggregateΒΆ

The Account aggregate manages banking accounts and transactions:

from decimal import Decimal
from neuroglia.data.abstractions import AggregateRoot

@dataclass
class AccountState:
    """Account aggregate state"""
    id: str = None
    owner_id: str = None
    account_number: str = None
    balance: Decimal = Decimal('0.00')
    currency: str = 'USD'
    is_active: bool = True

class Account(AggregateRoot[str]):
    """Account aggregate root"""

    def __init__(self, id: str = None):
        super().__init__(id)
        self.state = AccountState()

    def open(self, owner_id: str, account_number: str, initial_deposit: Decimal = None):
        """Open a new account"""

        # Validate business rules
        if not owner_id:
            raise ValueError("Owner ID is required")

        if not account_number:
            raise ValueError("Account number is required")

        if initial_deposit and initial_deposit < Decimal('0'):
            raise ValueError("Initial deposit cannot be negative")

        # Raise domain event
        self.apply(AccountOpenedEvent(
            account_id=self.id,
            owner_id=owner_id,
            account_number=account_number,
            initial_deposit=initial_deposit or Decimal('0.00')
        ))

    def deposit(self, amount: Decimal, description: str = None):
        """Deposit money to the account"""

        # Validate business rules
        if amount <= Decimal('0'):
            raise ValueError("Deposit amount must be positive")

        if not self.state.is_active:
            raise ValueError("Cannot deposit to inactive account")

        # Raise domain event
        self.apply(MoneyDepositedEvent(
            account_id=self.id,
            amount=amount,
            description=description,
            balance_after=self.state.balance + amount
        ))

    def withdraw(self, amount: Decimal, description: str = None):
        """Withdraw money from the account"""

        # Validate business rules
        if amount <= Decimal('0'):
            raise ValueError("Withdrawal amount must be positive")

        if not self.state.is_active:
            raise ValueError("Cannot withdraw from inactive account")

        if self.state.balance < amount:
            raise ValueError("Insufficient funds")

        # Raise domain event
        self.apply(MoneyWithdrawnEvent(
            account_id=self.id,
            amount=amount,
            description=description,
            balance_after=self.state.balance - amount
        ))

    # Event handlers
    def on_account_opened(self, event: AccountOpenedEvent):
        """Handle account opened event"""
        self.state.id = event.account_id
        self.state.owner_id = event.owner_id
        self.state.account_number = event.account_number
        self.state.balance = event.initial_deposit

    def on_money_deposited(self, event: MoneyDepositedEvent):
        """Handle money deposited event"""
        self.state.balance = event.balance_after

    def on_money_withdrawn(self, event: MoneyWithdrawnEvent):
        """Handle money withdrawn event"""
        self.state.balance = event.balance_after

πŸ’Ό Application LayerΒΆ

Command HandlersΒΆ

Command handlers execute business operations:

from neuroglia.mediation.mediator import CommandHandler
from neuroglia.data.infrastructure.abstractions import Repository

class RegisterPersonCommandHandler(CommandHandler[RegisterPersonCommand, OperationResult[PersonDto]]):
    """Handles person registration commands"""

    def __init__(self, 
                 mapper: Mapper,
                 person_repository: Repository[Person, str]):
        self.mapper = mapper
        self.person_repository = person_repository

    async def handle_async(self, command: RegisterPersonCommand) -> OperationResult[PersonDto]:
        try:
            # Create new person aggregate
            person = Person(str(uuid.uuid4()))

            # Execute business operation
            person.register(
                first_name=command.first_name,
                last_name=command.last_name,
                nationality=command.nationality,
                gender=command.gender,
                date_of_birth=command.date_of_birth,
                address=command.address
            )

            # Save to event store
            saved_person = await self.person_repository.add_async(person)

            # Map to DTO and return
            person_dto = self.mapper.map(saved_person.state, PersonDto)
            return self.created(person_dto)

        except ValueError as ex:
            return self.bad_request(str(ex))
        except Exception as ex:
            return self.internal_error(f"Failed to register person: {ex}")

class DepositCommandHandler(CommandHandler[DepositCommand, OperationResult[AccountDto]]):
    """Handles money deposit commands"""

    def __init__(self, 
                 mapper: Mapper,
                 account_repository: Repository[Account, str]):
        self.mapper = mapper
        self.account_repository = account_repository

    async def handle_async(self, command: DepositCommand) -> OperationResult[AccountDto]:
        try:
            # Load account from event store
            account = await self.account_repository.get_by_id_async(command.account_id)
            if account is None:
                return self.not_found("Account not found")

            # Execute business operation
            account.deposit(command.amount, command.description)

            # Save changes
            await self.account_repository.update_async(account)

            # Map to DTO and return
            account_dto = self.mapper.map(account.state, AccountDto)
            return self.ok(account_dto)

        except ValueError as ex:
            return self.bad_request(str(ex))
        except Exception as ex:
            return self.internal_error(f"Failed to deposit money: {ex}")

Query HandlersΒΆ

Query handlers retrieve data for read operations:

class GetPersonByIdQueryHandler(QueryHandler[GetPersonByIdQuery, OperationResult[PersonDto]]):
    """Handles person lookup queries"""

    def __init__(self, 
                 mapper: Mapper,
                 person_repository: Repository[PersonDto, str]):  # Read model repository
        self.mapper = mapper
        self.person_repository = person_repository

    async def handle_async(self, query: GetPersonByIdQuery) -> OperationResult[PersonDto]:
        person = await self.person_repository.get_by_id_async(query.person_id)

        if person is None:
            return self.not_found(f"Person with ID {query.person_id} not found")

        return self.ok(person)

class GetAccountsByOwnerQueryHandler(QueryHandler[GetAccountsByOwnerQuery, OperationResult[List[AccountDto]]]):
    """Handles account lookup by owner queries"""

    def __init__(self, account_repository: Repository[AccountDto, str]):
        self.account_repository = account_repository

    async def handle_async(self, query: GetAccountsByOwnerQuery) -> OperationResult[List[AccountDto]]:
        accounts = await self.account_repository.find_by_criteria_async(
            {"owner_id": query.owner_id}
        )
        return self.ok(accounts)

πŸ“‘ Event HandlingΒΆ

Domain EventsΒΆ

Domain events represent business events within aggregates:

@dataclass
class PersonRegisteredEvent(DomainEvent):
    """Event raised when a person is registered"""
    person_id: str
    first_name: str
    last_name: str
    nationality: str
    gender: PersonGender
    date_of_birth: date
    address: Address

@dataclass
class AccountOpenedEvent(DomainEvent):
    """Event raised when an account is opened"""
    account_id: str
    owner_id: str
    account_number: str
    initial_deposit: Decimal

@dataclass
class MoneyDepositedEvent(DomainEvent):
    """Event raised when money is deposited"""
    account_id: str
    amount: Decimal
    description: str
    balance_after: Decimal

Integration EventsΒΆ

Integration events handle cross-bounded-context communication:

class PersonRegisteredIntegrationEventHandler(EventHandler[PersonRegisteredEvent]):
    """Handles person registered events for integration purposes"""

    def __init__(self, 
                 cloud_event_publisher: CloudEventPublisher,
                 mapper: Mapper):
        self.cloud_event_publisher = cloud_event_publisher
        self.mapper = mapper

    async def handle_async(self, event: PersonRegisteredEvent):
        # Create integration event
        integration_event = PersonRegisteredIntegrationEvent(
            person_id=event.person_id,
            email=event.email,
            full_name=f"{event.first_name} {event.last_name}",
            timestamp=datetime.utcnow()
        )

        # Publish as CloudEvent
        await self.cloud_event_publisher.publish_async(
            event_type="person.registered.v1",
            data=integration_event,
            source="openbank.persons"
        )

πŸ—„οΈ Data AccessΒΆ

Event Sourcing RepositoryΒΆ

The write model uses event sourcing:

# Configuration in main.py
from neuroglia.data.infrastructure.event_sourcing import EventSourcingRepository
from neuroglia.data.infrastructure.event_sourcing.event_store import ESEventStore

# Configure Event Store
ESEventStore.configure(builder, EventStoreOptions(database_name, consumer_group))

# Configure event sourcing repositories
DataAccessLayer.WriteModel.configure(
    builder, 
    ["samples.openbank.domain.models"], 
    lambda builder_, entity_type, key_type: EventSourcingRepository.configure(
        builder_, entity_type, key_type
    )
)

Read Model RepositoryΒΆ

The read model uses MongoDB:

# Configuration in main.py
from neuroglia.data.infrastructure.mongo import MongoRepository

# Configure MongoDB repositories
DataAccessLayer.ReadModel.configure(
    builder,
    ["samples.openbank.integration.models", "samples.openbank.application.events"],
    lambda builder_, entity_type, key_type: MongoRepository.configure(
        builder_, entity_type, key_type, database_name
    )
)

🌐 API Layer¢

ControllersΒΆ

Controllers expose the domain through REST APIs:

class PersonsController(ControllerBase):
    """Persons management API"""

    @post("/", response_model=PersonDto, status_code=201)
    async def register_person(self, command: RegisterPersonCommandDto) -> PersonDto:
        """Register a new person"""
        # Map DTO to domain command
        domain_command = self.mapper.map(command, RegisterPersonCommand)

        # Execute through mediator
        result = await self.mediator.execute_async(domain_command)

        # Process and return result
        return self.process(result)

    @get("/", response_model=List[PersonDto])
    async def list_persons(self) -> List[PersonDto]:
        """List all registered persons"""
        query = ListPersonsQuery()
        result = await self.mediator.execute_async(query)
        return self.process(result)

    @get("/{person_id}", response_model=PersonDto)
    async def get_person_by_id(self, person_id: str) -> PersonDto:
        """Get person by ID"""
        query = GetPersonByIdQuery(person_id=person_id)
        result = await self.mediator.execute_async(query)
        return self.process(result)

class AccountsController(ControllerBase):
    """Accounts management API"""

    @post("/", response_model=AccountDto, status_code=201)
    async def open_account(self, command: OpenAccountCommandDto) -> AccountDto:
        """Open a new account"""
        domain_command = self.mapper.map(command, OpenAccountCommand)
        result = await self.mediator.execute_async(domain_command)
        return self.process(result)

    @post("/{account_id}/deposit", response_model=AccountDto)
    async def deposit(self, account_id: str, command: DepositCommandDto) -> AccountDto:
        """Deposit money to account"""
        domain_command = self.mapper.map(command, DepositCommand)
        domain_command.account_id = account_id
        result = await self.mediator.execute_async(domain_command)
        return self.process(result)

    @get("/by-owner/{owner_id}", response_model=List[AccountDto])
    async def get_accounts_by_owner(self, owner_id: str) -> List[AccountDto]:
        """Get all accounts for a person"""
        query = GetAccountsByOwnerQuery(owner_id=owner_id)
        result = await self.mediator.execute_async(query)
        return self.process(result)

πŸ§ͺ TestingΒΆ

Unit TestsΒΆ

Test domain logic in isolation:

def test_person_registration():
    # Arrange
    person = Person("test-id")
    address = Address("123 Main St", "Anytown", "12345", "USA")

    # Act
    person.register(
        first_name="John",
        last_name="Doe",
        nationality="US",
        gender=PersonGender.MALE,
        date_of_birth=date(1990, 1, 1),
        address=address
    )

    # Assert
    assert person.state.first_name == "John"
    assert person.state.last_name == "Doe"
    assert len(person.uncommitted_events) == 1
    assert isinstance(person.uncommitted_events[0], PersonRegisteredEvent)

def test_account_deposit():
    # Arrange
    account = Account("test-account")
    account.open("owner-id", "123456789", Decimal('100.00'))

    # Act
    account.deposit(Decimal('50.00'), "Test deposit")

    # Assert
    assert account.state.balance == Decimal('150.00')
    assert len(account.uncommitted_events) == 2  # Open + Deposit

Integration TestsΒΆ

Test the complete flow:

@pytest.mark.asyncio
async def test_person_registration_flow():
    # Arrange
    client = TestClient(app)
    person_data = {
        "first_name": "John",
        "last_name": "Doe",
        "nationality": "US",
        "gender": "MALE",
        "date_of_birth": "1990-01-01",
        "address": {
            "street": "123 Main St",
            "city": "Anytown",
            "postal_code": "12345",
            "country": "USA"
        }
    }

    # Act
    response = client.post("/api/v1/persons", json=person_data)

    # Assert
    assert response.status_code == 201
    person = response.json()
    assert person["first_name"] == "John"
    assert person["last_name"] == "Doe"

    # Verify person can be retrieved
    get_response = client.get(f"/api/v1/persons/{person['id']}")
    assert get_response.status_code == 200

πŸš€ Running the SampleΒΆ

Start the ApplicationΒΆ

  1. Start infrastructure:
docker-compose up -d
  1. Run the application:
python api/main.py

Example API CallsΒΆ

Register a Person:

curl -X POST "http://localhost:8000/api/v1/persons" \
  -H "Content-Type: application/json" \
  -d '{
    "first_name": "John",
    "last_name": "Doe",
    "nationality": "US",
    "gender": "MALE",
    "date_of_birth": "1990-01-01",
    "address": {
      "street": "123 Main St",
      "city": "Anytown",
      "postal_code": "12345",
      "country": "USA"
    }
  }'

Open an Account:

curl -X POST "http://localhost:8000/api/v1/accounts" \
  -H "Content-Type: application/json" \
  -d '{
    "owner_id": "PERSON_ID_FROM_ABOVE",
    "account_number": "123456789",
    "initial_deposit": 1000.00
  }'

Deposit Money:

curl -X POST "http://localhost:8000/api/v1/accounts/ACCOUNT_ID/deposit" \
  -H "Content-Type: application/json" \
  -d '{
    "amount": 500.00,
    "description": "Salary deposit"
  }'

πŸ“‹ Key LearningsΒΆ

The OpenBank sample demonstrates:

  1. Event Sourcing: How to store state as a sequence of events
  2. CQRS: Separation of write and read models
  3. Domain-Driven Design: Rich domain models with business rules
  4. Clean Architecture: Clear separation of concerns
  5. Event-Driven Architecture: How events enable loose coupling
  6. Repository Pattern: Abstract data access for different storage types
  7. Integration Events: Cross-bounded-context communication