π‘οΈ RBAC & Authorization GuideΒΆ
This guide provides comprehensive patterns for implementing Role-Based Access Control (RBAC) in Neuroglia applications, with practical examples showing how to secure commands, queries, and resources.
π― What is RBAC?ΒΆ
Role-Based Access Control (RBAC) is an authorization approach where access decisions are based on the roles assigned to users. Instead of granting permissions directly to users, permissions are assigned to roles, and roles are assigned to users.
RBAC Core ConceptsΒΆ
- User: An individual with specific roles (e.g., john@company.com)
- Role: A named collection of permissions (e.g., "admin", "manager", "customer")
- Permission: Specific action on a resource (e.g., "orders:create", "kitchen:manage")
- Resource: Entity being accessed (e.g., Order, Pizza, User Account)
Why RBAC?ΒΆ
β Simplified Management: Assign roles instead of individual permissions β Scalability: Add new users without reconfiguring permissions β Principle of Least Privilege: Users only get access they need β Audit Trail: Easy to track who can do what β Compliance: Meets regulatory requirements (SOC 2, GDPR, etc.)
ποΈ RBAC Architecture in NeurogliaΒΆ
Authorization in the Application LayerΒΆ
Key Principle: In Neuroglia, authorization happens in the Application Layer (handlers), not at the API layer (controllers).
Why?
β Business Logic Ownership: Authorization is a business rule β Testability: Easy to unit test without HTTP infrastructure β Reusability: Same auth logic works across different interfaces (REST, GraphQL, gRPC) β Fine-Grained Control: Can implement complex resource-level authorization
graph LR
subgraph "API Layer"
Controller["π Controller<br/>(Thin Layer)"]
end
subgraph "Application Layer"
Handler["βοΈ Handler<br/>(RBAC Logic Here)"]
AuthCheck{"π‘οΈ Authorization<br/>Check"}
end
subgraph "Domain Layer"
Entity["ποΈ Entity"]
Repo["π¦ Repository"]
end
Controller -->|"Pass user context"| Handler
Handler --> AuthCheck
AuthCheck -->|"Authorized"| Entity
AuthCheck -->|"Forbidden"| Controller
Entity --> Repo
style AuthCheck fill:#ffccbc
style Handler fill:#fff9c4
User Context FlowΒΆ
sequenceDiagram
participant Client as π€ Client
participant API as π API Controller
participant Middleware as π JWT Middleware
participant Handler as βοΈ Command/Query Handler
participant Domain as ποΈ Domain Layer
Client->>+API: Request with JWT<br/>Authorization: Bearer {token}
API->>+Middleware: Validate JWT
Middleware->>Middleware: Decode token<br/>(extract user, roles)
Middleware-->>-API: User context<br/>{user_id, username, roles}
API->>API: Create Command/Query<br/>with user context
API->>+Handler: Execute via Mediator
Handler->>Handler: Check authorization<br/>based on roles
alt Authorized
Handler->>+Domain: Execute business logic
Domain-->>-Handler: Result
Handler-->>API: OperationResult (Success)
else Forbidden
Handler-->>API: OperationResult (Forbidden)
end
API-->>-Client: HTTP Response<br/>(200 OK or 403 Forbidden)
π JWT Token Structure for RBACΒΆ
Token ClaimsΒΆ
A typical JWT for RBAC contains:
{
"sub": "550e8400-e29b-41d4-a716-446655440000",
"username": "mario.rossi",
"email": "mario.rossi@example.com",
"roles": ["customer", "vip"],
"permissions": ["orders:read", "orders:create", "menu:read"],
"department": "sales",
"organization_id": "acme-corp",
"exp": 1730494800,
"iat": 1730491200,
"iss": "https://auth.mariospizzeria.com",
"aud": "pizzeria-api"
}
Extracting User ContextΒΆ
In FastAPI Controller:
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
security = HTTPBearer()
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
"""Extract and validate user information from JWT."""
token = credentials.credentials
try:
payload = jwt.decode(
token,
settings.JWT_SECRET_KEY,
algorithms=["HS256"],
audience=settings.JWT_AUDIENCE
)
return {
"user_id": payload.get("sub"),
"username": payload.get("username"),
"email": payload.get("email"),
"roles": payload.get("roles", []),
"permissions": payload.get("permissions", []),
"department": payload.get("department"),
"organization_id": payload.get("organization_id")
}
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired"
)
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
class OrdersController(ControllerBase):
@post("/", response_model=OrderDto, status_code=201)
async def create_order(
self,
create_order_dto: CreateOrderDto,
user: dict = Depends(get_current_user)
) -> OrderDto:
"""Create order with user context."""
command = CreateOrderCommand(
customer_id=create_order_dto.customer_id,
items=create_order_dto.items,
user_context=user # Pass user context to handler
)
result = await self.mediator.execute_async(command)
return self.process(result)
π― RBAC Implementation PatternsΒΆ
Pattern 1: Role-Based AuthorizationΒΆ
Check if user has specific role(s):
from neuroglia.mediation import CommandHandler, Command
from neuroglia.core import OperationResult
from dataclasses import dataclass
@dataclass
class DeleteOrderCommand(Command[OperationResult[bool]]):
order_id: str
user_context: dict
class DeleteOrderHandler(CommandHandler[DeleteOrderCommand, OperationResult[bool]]):
def __init__(self, order_repository: OrderRepository):
super().__init__()
self.order_repository = order_repository
async def handle_async(self, command: DeleteOrderCommand) -> OperationResult[bool]:
"""Only admins can delete orders."""
# Authorization check
if not self._has_role(command.user_context, "admin"):
return self.forbidden("Only administrators can delete orders")
# Business logic
await self.order_repository.delete_async(command.order_id)
return self.ok(True)
def _has_role(self, user_context: dict, role: str) -> bool:
"""Check if user has specific role."""
return role in user_context.get("roles", [])
Pattern 2: Permission-Based AuthorizationΒΆ
Check if user has specific permission(s):
@dataclass
class UpdateMenuPricesCommand(Command[OperationResult[None]]):
price_updates: dict[str, Decimal]
user_context: dict
class UpdateMenuPricesHandler(CommandHandler[UpdateMenuPricesCommand, OperationResult[None]]):
async def handle_async(self, command: UpdateMenuPricesCommand) -> OperationResult[None]:
"""Check permission to update menu prices."""
# Permission-based authorization
if not self._has_permission(command.user_context, "menu:update:prices"):
return self.forbidden("Insufficient permissions to update menu prices")
# Business logic
for pizza_id, new_price in command.price_updates.items():
await self.pizza_repository.update_price_async(pizza_id, new_price)
return self.ok(None)
def _has_permission(self, user_context: dict, permission: str) -> bool:
"""Check if user has specific permission."""
return permission in user_context.get("permissions", [])
Pattern 3: Resource-Level AuthorizationΒΆ
Check ownership or relationship to resource:
@dataclass
class GetOrderQuery(Query[OperationResult[OrderDto]]):
order_id: str
user_context: dict
class GetOrderHandler(QueryHandler[GetOrderQuery, OperationResult[OrderDto]]):
async def handle_async(self, query: GetOrderQuery) -> OperationResult[OrderDto]:
"""Get order with resource-level authorization."""
order = await self.order_repository.get_by_id_async(query.order_id)
if not order:
return self.not_found(f"Order {query.order_id} not found")
# Resource-level authorization
if not self._can_access_order(query.user_context, order):
return self.forbidden("You do not have access to this order")
order_dto = self.mapper.map(order, OrderDto)
return self.ok(order_dto)
def _can_access_order(self, user_context: dict, order: Order) -> bool:
"""Check if user can access specific order."""
user_roles = user_context.get("roles", [])
user_id = user_context.get("user_id")
# Admins can see all orders
if "admin" in user_roles or "kitchen_manager" in user_roles:
return True
# Customers can only see their own orders
if "customer" in user_roles:
return order.customer_id == user_id
# Delivery drivers can see orders assigned to them
if "delivery" in user_roles:
return order.assigned_driver_id == user_id
return False
Pattern 4: Multi-Role AuthorizationΒΆ
Allow access if user has ANY of the required roles:
@dataclass
class ViewKitchenDashboardQuery(Query[OperationResult[KitchenDashboardDto]]):
user_context: dict
class ViewKitchenDashboardHandler(QueryHandler[ViewKitchenDashboardQuery, OperationResult[KitchenDashboardDto]]):
ALLOWED_ROLES = ["admin", "kitchen_manager", "chef", "cook"]
async def handle_async(self, query: ViewKitchenDashboardQuery) -> OperationResult[KitchenDashboardDto]:
"""Kitchen dashboard accessible by multiple roles."""
# Multi-role authorization
if not self._has_any_role(query.user_context, self.ALLOWED_ROLES):
return self.forbidden("Access to kitchen dashboard denied")
# Fetch dashboard data
dashboard = await self._build_dashboard()
return self.ok(dashboard)
def _has_any_role(self, user_context: dict, allowed_roles: list[str]) -> bool:
"""Check if user has any of the allowed roles."""
user_roles = set(user_context.get("roles", []))
return bool(user_roles & set(allowed_roles))
Pattern 5: Hierarchical Role AuthorizationΒΆ
Implement role hierarchy where higher roles inherit lower role permissions:
class RoleHierarchy:
"""Define role hierarchy for authorization."""
HIERARCHY = {
"admin": ["kitchen_manager", "delivery_manager", "customer_service", "customer"],
"kitchen_manager": ["chef", "cook"],
"delivery_manager": ["delivery"],
"customer_service": ["customer"],
"chef": ["cook"],
}
@classmethod
def has_role_or_higher(cls, user_roles: list[str], required_role: str) -> bool:
"""Check if user has required role or any higher role."""
# Direct role match
if required_role in user_roles:
return True
# Check if user has higher role
for user_role in user_roles:
if cls._is_role_higher(user_role, required_role):
return True
return False
@classmethod
def _is_role_higher(cls, user_role: str, required_role: str) -> bool:
"""Check if user_role is higher than required_role in hierarchy."""
subordinates = cls.HIERARCHY.get(user_role, [])
if required_role in subordinates:
return True
# Check recursively
for subordinate in subordinates:
if cls._is_role_higher(subordinate, required_role):
return True
return False
# Usage in handler
class StartCookingHandler(CommandHandler[StartCookingCommand, OperationResult[None]]):
async def handle_async(self, command: StartCookingCommand) -> OperationResult[None]:
"""Start cooking pizza (requires cook role or higher)."""
user_roles = command.user_context.get("roles", [])
# Check hierarchical authorization
if not RoleHierarchy.has_role_or_higher(user_roles, "cook"):
return self.forbidden("Insufficient role to start cooking")
# Business logic...
return self.ok(None)
Pattern 6: Context-Aware AuthorizationΒΆ
Authorization decisions based on current application state:
@dataclass
class CancelOrderCommand(Command[OperationResult[None]]):
order_id: str
user_context: dict
class CancelOrderHandler(CommandHandler[CancelOrderCommand, OperationResult[None]]):
async def handle_async(self, command: CancelOrderCommand) -> OperationResult[None]:
"""Cancel order with context-aware authorization."""
order = await self.order_repository.get_by_id_async(command.order_id)
if not order:
return self.not_found("Order not found")
user_roles = command.user_context.get("roles", [])
user_id = command.user_context.get("user_id")
# Context-aware authorization
can_cancel = self._can_cancel_order(order, user_roles, user_id)
if not can_cancel:
return self.forbidden("Cannot cancel order in current state")
# Business logic
order.cancel()
await self.order_repository.update_async(order)
return self.ok(None)
def _can_cancel_order(self, order: Order, user_roles: list[str], user_id: str) -> bool:
"""Complex authorization logic based on order state and user context."""
# Admins can always cancel
if "admin" in user_roles:
return True
# Customers can cancel their own orders if not yet cooking
if "customer" in user_roles:
return (
order.customer_id == user_id and
order.status in [OrderStatus.PENDING, OrderStatus.CONFIRMED]
)
# Kitchen managers can cancel while cooking
if "kitchen_manager" in user_roles:
return order.status in [OrderStatus.PENDING, OrderStatus.CONFIRMED, OrderStatus.COOKING]
return False
π Integration with KeycloakΒΆ
Keycloak SetupΒΆ
Keycloak is the recommended identity provider for production Neuroglia applications.
Realm Configuration:
{
"realm": "mario-pizzeria",
"enabled": true,
"clients": [
{
"clientId": "pizzeria-api",
"enabled": true,
"protocol": "openid-connect",
"publicClient": false,
"standardFlowEnabled": true,
"directAccessGrantsEnabled": true,
"bearerOnly": false
}
],
"roles": {
"realm": [
{ "name": "admin" },
{ "name": "kitchen_manager" },
{ "name": "chef" },
{ "name": "cook" },
{ "name": "delivery" },
{ "name": "customer" }
]
}
}
Keycloak JWT ValidationΒΆ
from jose import jwt, JWTError
from typing import Optional
import httpx
class KeycloakAuthService:
"""Validate Keycloak JWT tokens."""
def __init__(self, keycloak_url: str, realm: str):
self.keycloak_url = keycloak_url
self.realm = realm
self.public_key: Optional[str] = None
async def get_public_key(self) -> str:
"""Fetch Keycloak public key for JWT validation."""
if self.public_key:
return self.public_key
url = f"{self.keycloak_url}/realms/{self.realm}"
async with httpx.AsyncClient() as client:
response = await client.get(url)
realm_info = response.json()
self.public_key = f"-----BEGIN PUBLIC KEY-----\n{realm_info['public_key']}\n-----END PUBLIC KEY-----"
return self.public_key
async def validate_token(self, token: str) -> dict:
"""Validate Keycloak JWT and extract claims."""
public_key = await self.get_public_key()
try:
payload = jwt.decode(
token,
public_key,
algorithms=["RS256"],
audience="pizzeria-api",
options={"verify_aud": True}
)
# Extract roles from Keycloak token structure
resource_access = payload.get("resource_access", {})
client_roles = resource_access.get("pizzeria-api", {}).get("roles", [])
realm_roles = payload.get("realm_access", {}).get("roles", [])
return {
"user_id": payload.get("sub"),
"username": payload.get("preferred_username"),
"email": payload.get("email"),
"roles": client_roles + realm_roles,
"permissions": payload.get("scope", "").split(),
"token_type": payload.get("typ"),
"expires_at": payload.get("exp")
}
except JWTError as e:
raise UnauthorizedException(f"Invalid token: {str(e)}")
FastAPI Dependency with KeycloakΒΆ
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user_keycloak(
credentials: HTTPAuthorizationCredentials = Depends(security),
auth_service: KeycloakAuthService = Depends()
) -> dict:
"""Validate Keycloak token and return user context."""
try:
user_context = await auth_service.validate_token(credentials.credentials)
return user_context
except UnauthorizedException as e:
raise HTTPException(status_code=401, detail=str(e))
# Usage in controller
class OrdersController(ControllerBase):
@get("/", response_model=List[OrderDto])
async def get_orders(
self,
user: dict = Depends(get_current_user_keycloak)
) -> List[OrderDto]:
"""Get orders with Keycloak authentication."""
query = GetOrdersQuery(user_context=user)
result = await self.mediator.execute_async(query)
return self.process(result)
π§ͺ Testing RBACΒΆ
Unit Testing Handlers with RBACΒΆ
import pytest
from unittest.mock import Mock, AsyncMock
@pytest.mark.asyncio
class TestDeleteOrderHandler:
async def test_admin_can_delete_order(self):
"""Admins should be able to delete any order."""
# Arrange
order_repo = Mock(spec=OrderRepository)
order_repo.delete_async = AsyncMock()
handler = DeleteOrderHandler(order_repo)
command = DeleteOrderCommand(
order_id="order-123",
user_context={
"user_id": "admin-user",
"username": "admin",
"roles": ["admin"]
}
)
# Act
result = await handler.handle_async(command)
# Assert
assert result.is_success
order_repo.delete_async.assert_called_once_with("order-123")
async def test_customer_cannot_delete_order(self):
"""Customers should not be able to delete orders."""
# Arrange
order_repo = Mock(spec=OrderRepository)
handler = DeleteOrderHandler(order_repo)
command = DeleteOrderCommand(
order_id="order-123",
user_context={
"user_id": "customer-user",
"username": "customer",
"roles": ["customer"]
}
)
# Act
result = await handler.handle_async(command)
# Assert
assert not result.is_success
assert result.status_code == 403
assert "administrator" in result.error_message.lower()
order_repo.delete_async.assert_not_called()
Integration Testing with JWTΒΆ
import pytest
from fastapi.testclient import TestClient
import jwt
from datetime import datetime, timedelta
@pytest.fixture
def test_jwt_token():
"""Generate test JWT token."""
payload = {
"sub": "test-user-id",
"username": "test_user",
"roles": ["customer"],
"exp": datetime.utcnow() + timedelta(hours=1),
"iat": datetime.utcnow()
}
return jwt.encode(payload, "test-secret", algorithm="HS256")
@pytest.fixture
def test_admin_token():
"""Generate admin JWT token."""
payload = {
"sub": "admin-user-id",
"username": "admin_user",
"roles": ["admin"],
"exp": datetime.utcnow() + timedelta(hours=1),
"iat": datetime.utcnow()
}
return jwt.encode(payload, "test-secret", algorithm="HS256")
def test_customer_can_view_own_orders(test_client: TestClient, test_jwt_token: str):
"""Test customer authorization for viewing orders."""
headers = {"Authorization": f"Bearer {test_jwt_token}"}
response = test_client.get("/api/orders", headers=headers)
assert response.status_code == 200
orders = response.json()
assert isinstance(orders, list)
def test_customer_cannot_delete_orders(test_client: TestClient, test_jwt_token: str):
"""Test customer cannot delete orders."""
headers = {"Authorization": f"Bearer {test_jwt_token}"}
response = test_client.delete("/api/orders/order-123", headers=headers)
assert response.status_code == 403
assert "administrator" in response.json()["detail"].lower()
def test_admin_can_delete_orders(test_client: TestClient, test_admin_token: str):
"""Test admin can delete orders."""
headers = {"Authorization": f"Bearer {test_admin_token}"}
response = test_client.delete("/api/orders/order-123", headers=headers)
assert response.status_code == 200
π Best PracticesΒΆ
1. Authorization at Application LayerΒΆ
β DO: Implement authorization in handlers
class Handler:
async def handle_async(self, command):
if not self._authorized(command.user_context):
return self.forbidden("Access denied")
β DON'T: Implement authorization in controllers
@get("/orders")
async def get_orders(user: dict = Depends(get_current_user)):
if "admin" not in user["roles"]: # β Wrong place!
raise HTTPException(403)
2. Use Composition Over DuplicationΒΆ
Create reusable authorization helpers:
class AuthorizationHelper:
"""Reusable authorization logic."""
@staticmethod
def has_role(user_context: dict, role: str) -> bool:
return role in user_context.get("roles", [])
@staticmethod
def has_any_role(user_context: dict, roles: list[str]) -> bool:
user_roles = set(user_context.get("roles", []))
return bool(user_roles & set(roles))
@staticmethod
def has_permission(user_context: dict, permission: str) -> bool:
return permission in user_context.get("permissions", [])
@staticmethod
def is_resource_owner(user_context: dict, resource_owner_id: str) -> bool:
return user_context.get("user_id") == resource_owner_id
3. Fail SecurelyΒΆ
Default to deny access:
def _can_access(self, user_context: dict, resource: Resource) -> bool:
"""Default to deny access."""
# Explicit allow conditions
if self._is_admin(user_context):
return True
if self._is_owner(user_context, resource):
return True
# Default deny
return False # β
Fail securely
4. Audit Authorization DecisionsΒΆ
Log authorization failures for security monitoring:
async def handle_async(self, command: Command) -> OperationResult:
if not self._authorized(command.user_context):
self.logger.warning(
f"Authorization failed: User {command.user_context['username']} "
f"attempted {command.__class__.__name__} without sufficient permissions"
)
return self.forbidden("Access denied")
5. Keep Roles and Permissions ConfigurableΒΆ
Don't hardcode role names:
# settings.py
class AuthorizationSettings:
ADMIN_ROLES = ["admin", "super_admin"]
KITCHEN_ROLES = ["admin", "kitchen_manager", "chef", "cook"]
DELIVERY_ROLES = ["admin", "delivery_manager", "delivery"]
# handler.py
class Handler:
def __init__(self, auth_settings: AuthorizationSettings):
self.auth_settings = auth_settings
def _is_admin(self, user_context: dict) -> bool:
user_roles = set(user_context.get("roles", []))
admin_roles = set(self.auth_settings.ADMIN_ROLES)
return bool(user_roles & admin_roles)
π Related DocumentationΒΆ
- OAuth & JWT Reference - Comprehensive authentication guide
- Simple UI Sample - Complete RBAC implementation example
- Mario's Pizzeria Authentication - Step-by-step auth tutorial
- CQRS Pattern - Command and query separation
π‘ SummaryΒΆ
RBAC in Neuroglia follows these principles:
- Authorization in Application Layer: Handlers contain authorization logic
- User Context from JWT: Extract roles and permissions from token
- Multiple Authorization Patterns: Role-based, permission-based, resource-level
- Keycloak Integration: Production-ready identity management
- Testable: Easy to unit test without HTTP infrastructure
- Fail Securely: Default to deny access
By following these patterns, you can build secure, maintainable, and testable authorization into your Neuroglia applications.