๐ API Gateway Sample Applicationยถ
The API Gateway sample demonstrates how to build a modern microservice gateway using the Neuroglia framework. This application showcases advanced patterns including OAuth2 authentication, external service integration, background task processing, and cloud event handling.
๐ฏ What You'll Learnยถ
- Microservice Gateway Patterns: How to build a centralized API gateway for service orchestration
- OAuth2 Authentication & Authorization: Implementing JWT-based security with Keycloak integration
- External Service Integration: Connecting to multiple external APIs with proper abstraction
- Background Task Processing: Asynchronous task execution with Redis-backed job scheduling
- Object Storage Integration: File management with MinIO S3-compatible storage
- Cloud Events: Event-driven communication between microservices
- Advanced Dependency Injection: Complex service configuration and lifetime management
๐๏ธ Architecture Overviewยถ
graph TB
subgraph "API Gateway Service"
A[PromptController] --> B[Mediator]
B --> C[Command/Query Handlers]
C --> D[Domain Models]
C --> E[Integration Services]
F[OAuth2 Middleware] --> A
G[Cloud Event Middleware] --> A
H[Exception Handling] --> A
end
subgraph "External Dependencies"
I[Keycloak OAuth2]
J[Redis Cache]
K[MinIO Storage]
L[External APIs]
M[Background Tasks]
end
E --> I
E --> J
E --> K
E --> L
C --> M
The API Gateway follows a distributed microservice pattern where:
- Gateway Layer: Centralized entry point for multiple downstream services
- Authentication Layer: OAuth2/JWT-based security with external identity provider
- Integration Layer: Multiple external service clients with proper abstraction
- DTOs: Data transfer objects for external communication
๐ Key Features Demonstratedยถ
1. OAuth2 Authentication & Securityยถ
# JWT token validation with Keycloak
@post("/item", response_model=ItemPromptCommandResponseDto)
async def create_new_item_prompt(
self,
command_dto: CreateNewItemPromptCommandDto,
key: str = Depends(validate_mosaic_authentication)
) -> Any:
# Protected endpoint with API key validation
2. Multi-Service Integrationยถ
# External service clients
MinioStorageManager.configure(builder) # Object storage
MosaicApiClient.configure(builder) # External API
AsyncStringCacheRepository.configure(builder) # Redis caching
BackgroundTaskScheduler.configure(builder) # Async processing
3. Advanced Domain Modelยถ
@map_to(PromptResponseDto)
@dataclass
class PromptResponse:
id: str
prompt_id: str
content: str
status: PromptStatus
metadata: dict[str, Any]
created_at: datetime.datetime
4. Background Task Processingยถ
# Asynchronous task execution
BackgroundTaskScheduler.configure(builder, ["application.tasks"])
# Redis-backed job queue
background_job_store: dict[str, str | int] = {
"redis_host": "redis47",
"redis_port": 6379,
"redis_db": 0
}
5. Cloud Events Integrationยถ
# Event publishing and consumption
CloudEventIngestor.configure(builder, ["application.events.integration"])
CloudEventPublisher.configure(builder)
app.add_middleware(CloudEventMiddleware, service_provider=app.services)
๐ง Configuration & Settingsยถ
Application Settingsยถ
class AiGatewaySettings(ApplicationSettings):
# OAuth2 Configuration
jwt_authority: str = "http://keycloak47/realms/mozart"
jwt_audience: str = "ai-gateways"
required_scope: str = "api"
# External Service Settings
s3_endpoint: str # MinIO storage
connection_strings: dict[str, str] # Redis, databases
mosaic_api_keys: list[str] # API authentication
# Background Processing
background_job_store: dict[str, str | int]
redis_max_connections: int = 10
Service Registrationยถ
# Core framework services
Mapper.configure(builder, application_modules)
Mediator.configure(builder, application_modules)
JsonSerializer.configure(builder)
# Custom application services
AsyncStringCacheRepository.configure(builder, Prompt, str)
BackgroundTaskScheduler.configure(builder, ["application.tasks"])
MinioStorageManager.configure(builder)
LocalFileSystemManager.configure(builder)
# External integrations
builder.services.add_singleton(AiGatewaySettings, singleton=app_settings)
๐งช Testing Strategyยถ
Unit Testsยถ
class TestPromptController:
def setup_method(self):
self.mock_mediator = Mock(spec=Mediator)
self.mock_mapper = Mock(spec=Mapper)
self.controller = PromptController(
service_provider=Mock(),
mapper=self.mock_mapper,
mediator=self.mock_mediator
)
@pytest.mark.asyncio
async def test_create_prompt_success(self):
# Test successful prompt creation
command_dto = CreateNewItemPromptCommandDto(content="test")
result = await self.controller.create_new_item_prompt(command_dto, "valid-key")
assert result.status == "created"
self.mock_mediator.execute_async.assert_called_once()
Integration Testsยถ
@pytest.mark.integration
class TestApiGatewayIntegration:
@pytest.mark.asyncio
async def test_full_prompt_workflow(self, test_client):
# Test complete workflow from API to external services
response = await test_client.post(
"/api/prompts/item",
json={"content": "test prompt"},
headers={"Authorization": "Bearer valid-token"}
)
assert response.status_code == 201
assert "id" in response.json()
๐ Implementation Detailsยถ
1. Controller Layer (api/controllers/)ยถ
- PromptController: Main API endpoints for prompt management
- AppController: Application health and metadata endpoints
- InternalController: Internal service endpoints
- Authentication Schemes: OAuth2 and API key validation
2. Application Layer (application/)ยถ
- Commands: Write operations (CreateNewPromptCommand)
- Queries: Read operations (GetPromptByIdQuery)
- Services: Business logic orchestration
- Tasks: Background job definitions
- Events: Integration event handlers
3. Domain Layer (domain/)ยถ
- Prompt: Core domain entity with business rules
- PromptResponse: Value object for API responses
- Domain Events: Business event definitions
- Validation: Domain-specific validation logic
4. Integration Layer (integration/)ยถ
- External API Clients: Mosaic, GenAI, Mozart APIs
- Storage Services: MinIO object storage, Redis caching
- Background Services: Task scheduling and execution
- DTOs: Data transfer objects for external communication
๐ Background Processingยถ
The API Gateway demonstrates advanced background processing patterns:
# Task scheduling configuration
BackgroundTaskScheduler.configure(builder, ["application.tasks"])
# Redis-backed job store
builder.services.add_singleton(AiGatewaySettings, singleton=app_settings)
# Asynchronous task execution
@task_handler
class ProcessPromptTask:
async def execute_async(self, prompt_id: str):
# Long-running prompt processing
prompt = await self.prompt_service.get_by_id(prompt_id)
result = await self.genai_client.process_prompt(prompt)
await self.storage_service.store_result(result)
๐ External Service Integrationยถ
MinIO Object Storageยถ
class MinioStorageManager:
async def upload_file_async(self, bucket: str, key: str, data: bytes) -> str:
# S3-compatible object storage
return await self.client.put_object(bucket, key, data)
Redis Cachingยถ
class AsyncStringCacheRepository:
async def get_async(self, key: str) -> Optional[str]:
return await self.redis_client.get(key)
async def set_async(self, key: str, value: str, ttl: int = None):
await self.redis_client.set(key, value, ex=ttl)
External API Integrationยถ
class MosaicApiClient:
async def submit_prompt_async(self, prompt: PromptDto) -> PromptResponseDto:
# OAuth2 authenticated API calls
token = await self.get_access_token()
response = await self.http_client.post(
"/api/prompts",
json=prompt.dict(),
headers={"Authorization": f"Bearer {token}"}
)
return PromptResponseDto.parse_obj(response.json())
๐ Getting Startedยถ
1. Prerequisitesยถ
# Install dependencies
pip install -r requirements.txt
# Configure external services
docker-compose up -d redis keycloak minio
2. Configurationยถ
# Set environment variables
export JWT_AUTHORITY="http://localhost:8080/realms/mozart"
export S3_ENDPOINT="http://localhost:9000"
export REDIS_URL="redis://localhost:6379"
3. Run the Applicationยถ
# Start the API Gateway
python samples/api-gateway/main.py
# Access Swagger UI
open http://localhost:8000/docs
4. Test the APIยถ
# Get access token from Keycloak
curl -X POST http://localhost:8080/realms/mozart/protocol/openid-connect/token \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=client_credentials&client_id=ai-gateway&client_secret=secret"
# Call protected endpoint
curl -X POST http://localhost:8000/api/prompts/item \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"content": "Generate a sample prompt"}'
๐ Related Documentationยถ
- CQRS & Mediation - Command/Query patterns
- Dependency Injection - Service configuration
- Data Access - Repository patterns
- OpenBank Sample - Event sourcing comparison
- Desktop Controller Sample - Background services
๐ Comparison with OpenBank Sampleยถ
The API Gateway and OpenBank samples demonstrate different architectural patterns within the Neuroglia framework. Here's a detailed comparison:
Architecture Patternsยถ
| Aspect | API Gateway | OpenBank |
|---|---|---|
| Primary Pattern | Microservice Gateway | Event Sourcing + DDD |
| Data Persistence | Multi-store (Redis, MinIO, MongoDB) | Event Store + Read Models |
| State Management | Stateless with caching | Event-sourced aggregates |
| External Integration | Multiple external APIs | Focused domain model |
| Background Processing | Async task queues | Event-driven projections |
Domain Complexityยถ
API Gateway - Integration-Focusedยถ
# Simple domain model focused on orchestration
@dataclass
class PromptResponse:
id: str
prompt_id: str
content: str
status: PromptStatus
metadata: dict[str, Any]
OpenBank - Rich Domain Modelยถ
# Complex aggregate with business rules
class BankAccountV1(AggregateRoot[str]):
def record_transaction(self, amount: Decimal, transaction_type: BankTransactionTypeV1):
# Complex business logic and invariants
if transaction_type == BankTransactionTypeV1.DEBIT:
if self.state.balance + amount < -self.state.overdraft_limit:
raise InsufficientFundsException()
# Event sourcing
self.raise_event(BankAccountTransactionRecordedDomainEventV1(...))
Data Persistence Strategyยถ
API Gateway - Multi-Store Architectureยถ
# Multiple specialized storage systems
AsyncStringCacheRepository.configure(builder, Prompt, str) # Redis caching
MinioStorageManager.configure(builder) # Object storage
BackgroundTaskScheduler.configure(builder) # Job queue
# Standard CRUD operations
async def save_prompt(self, prompt: Prompt):
await self.cache_repository.set_async(prompt.id, prompt.content)
await self.storage_manager.upload_async(prompt.id, prompt.data)
OpenBank - Event Sourcingยถ
# Event-driven persistence
ESEventStore.configure(builder, EventStoreOptions(database_name, consumer_group))
# Write model: Event sourcing
DataAccessLayer.WriteModel.configure(
builder,
["samples.openbank.domain.models"],
lambda builder_, entity_type, key_type: EventSourcingRepository.configure(...)
)
# Read model: Projections
DataAccessLayer.ReadModel.configure(
builder,
["samples.openbank.integration.models"],
lambda builder_, entity_type, key_type: MongoRepository.configure(...)
)
Authentication & Securityยถ
API Gateway - OAuth2 + API Keysยถ
# Multiple authentication schemes
@post("/item", dependencies=[Depends(validate_mosaic_authentication)])
async def create_item_prompt(self, command_dto: CreateNewItemPromptCommandDto):
# API key validation for external services
@get("/status", dependencies=[Depends(validate_token)])
async def get_status(self):
# JWT token validation for internal services
OpenBank - Domain-Focused Securityยถ
# Business rule enforcement
class BankAccountV1(AggregateRoot[str]):
def record_transaction(self, amount: Decimal, transaction_type: BankTransactionTypeV1):
# Domain-level authorization
if not self.is_authorized_for_transaction(amount):
raise UnauthorizedTransactionException()
External Service Integrationยถ
API Gateway - Extensive Integrationยถ
# Multiple external service clients
class MosaicApiClient:
async def submit_prompt_async(self, prompt: PromptDto) -> PromptResponseDto:
token = await self.oauth_client.get_token_async()
return await self.http_client.post("/api/prompts", prompt, token)
class GenAiClient:
async def process_prompt_async(self, prompt: str) -> str:
return await self.ai_service.generate_response(prompt)
class MinioStorageManager:
async def store_file_async(self, bucket: str, key: str, data: bytes):
return await self.s3_client.put_object(bucket, key, data)
OpenBank - Minimal Integrationยถ
# Focused on domain logic, minimal external dependencies
class CreateBankAccountCommandHandler:
async def handle_async(self, command: CreateBankAccountCommand):
# Pure domain logic without external service calls
owner = await self.person_repository.get_by_id_async(command.owner_id)
account = BankAccountV1(str(uuid.uuid4()), owner, command.initial_balance)
await self.account_repository.save_async(account)
Background Processingยถ
API Gateway - Task Queue Patternยถ
# Redis-backed job queues
BackgroundTaskScheduler.configure(builder, ["application.tasks"])
@task_handler
class ProcessPromptTask:
async def execute_async(self, prompt_id: str):
prompt = await self.prompt_service.get_by_id(prompt_id)
result = await self.genai_client.process_prompt(prompt)
await self.storage_service.store_result(result)
OpenBank - Event-Driven Projectionsยถ
# Event handlers for read model updates
class BankAccountEventHandler:
@event_handler(BankAccountCreatedDomainEventV1)
async def handle_account_created(self, event: BankAccountCreatedDomainEventV1):
projection = BankAccountProjection.from_event(event)
await self.read_model_repository.save_async(projection)
Testing Strategiesยถ
API Gateway - Integration-Heavy Testingยถ
@pytest.mark.integration
class TestApiGatewayIntegration:
async def test_full_prompt_workflow(self, test_client, mock_external_services):
# Test complete workflow including external services
response = await test_client.post("/api/prompts/item", json=prompt_data)
# Verify external service calls
mock_external_services.genai_client.process_prompt.assert_called_once()
mock_external_services.storage_manager.upload.assert_called_once()
OpenBank - Domain-Focused Testingยถ
class TestBankAccountAggregate:
def test_transaction_recording(self):
# Pure domain logic testing
account = BankAccountV1("123", owner, Decimal("1000"))
account.record_transaction(Decimal("-100"), BankTransactionTypeV1.DEBIT)
# Verify business rules and events
assert account.state.balance == Decimal("900")
events = account.get_uncommitted_events()
assert isinstance(events[-1], BankAccountTransactionRecordedDomainEventV1)
Use Case Recommendationsยถ
Choose API Gateway Pattern whenยถ
- โ Building microservice orchestration layers
- โ Integrating multiple external services
- โ Need background job processing
- โ Require complex authentication schemes
- โ Working with heterogeneous data stores
- โ Building service mesh entry points
Choose Event Sourcing Pattern whenยถ
- โ Need complete audit trails
- โ Complex business logic and invariants
- โ Temporal queries are important
- โ Regulatory compliance requirements
- โ High consistency requirements
- โ Rich domain models with behavior
Framework Features Utilizedยถ
| Feature | API Gateway Usage | OpenBank Usage |
|---|---|---|
| CQRS/Mediation | Service orchestration | Domain command/query separation |
| Dependency Injection | External service clients | Repository abstractions |
| Event Handling | Integration events | Domain events + projections |
| Data Access | Multi-repository pattern | Event sourcing + read models |
| Background Processing | Async task queues | Event-driven handlers |
| Mapping | DTO transformations | Domain-to-DTO mapping |
| Validation | API contract validation | Business rule enforcement |
Both samples showcase different strengths of the Neuroglia framework, demonstrating its flexibility in supporting various architectural patterns while maintaining clean architecture principles.