Skip to content

Metrics Instrumentation Guide

This guide covers how to implement metrics instrumentation in the Starter App using OpenTelemetry.

Table of Contents

Overview

Metrics are numerical measurements aggregated over time that help you understand system behavior and performance.

When to Use Metrics

Use metrics for:

  • Counting events: Tasks created, API requests, errors
  • Measuring rates: Requests per second, throughput
  • Tracking distributions: Response times, payload sizes
  • Monitoring resources: CPU usage, memory, database connections
  • Alerting: Trigger alerts when values cross thresholds

Don't use metrics for:

  • ❌ High-cardinality data (unique user IDs, session IDs)
  • ❌ Debugging individual requests (use traces)
  • ❌ Storing events (use logs)

Architecture

graph LR
    Code[Application Code]
    Meter[OpenTelemetry Meter]
    Instruments[Metric Instruments]
    Reader[Metric Reader]
    Exporter[OTLP Exporter]
    Collector[OTEL Collector]
    Backend[Backend: Prometheus]

    Code -->|Create| Meter
    Meter -->|Create| Instruments
    Instruments -->|Record| Reader
    Reader -->|Export| Exporter
    Exporter -->|Send| Collector
    Collector -->|Forward| Backend

    style Code fill:#4CAF50
    style Instruments fill:#2196F3
    style Collector fill:#FF9800

Metric Types

OpenTelemetry provides several metric instruments, each for different use cases:

1. Counter

What: Monotonically increasing value (only goes up).

When: Counting events that accumulate over time.

Examples:

  • Total tasks created
  • Total API requests
  • Total errors

API:

from opentelemetry import metrics

meter = metrics.get_meter(__name__)

# Create counter
tasks_created = meter.create_counter(
    name="starter_app.tasks.created",
    description="Total tasks created",
    unit="1"
)

# Record value (always positive)
tasks_created.add(1, {"priority": "high", "status": "pending"})

Visualization: Use rate() or increase() to see rate of change.

# Tasks created per second
rate(starter_app_tasks_created_total[5m])

# Total tasks created in last hour
increase(starter_app_tasks_created_total[1h])

2. UpDownCounter

What: Value that can increase or decrease.

When: Tracking values that go up and down.

Examples:

  • Active connections
  • Queue length
  • Number of items in cache

API:

active_tasks = meter.create_up_down_counter(
    name="starter_app.tasks.active",
    description="Number of active (non-completed) tasks",
    unit="1"
)

# Increase
active_tasks.add(1, {"department": "engineering"})

# Decrease
active_tasks.add(-1, {"department": "engineering"})

Visualization: Use directly to see current count.

# Current active tasks
starter_app_tasks_active

# Average active tasks over time
avg_over_time(starter_app_tasks_active[5m])

3. Histogram

What: Distribution of values over time.

When: Measuring latency, size, or any value you want to analyze statistically.

Examples:

  • Request duration
  • Payload size
  • Database query time

API:

task_processing_time = meter.create_histogram(
    name="starter_app.task.processing_time",
    description="Time to process tasks",
    unit="ms"
)

# Record observation
processing_time_ms = (time.time() - start_time) * 1000
task_processing_time.record(
    processing_time_ms,
    {"operation": "create", "priority": "high"}
)

Visualization: Use percentiles, averages, histograms.

# 95th percentile processing time
histogram_quantile(0.95,
  rate(starter_app_task_processing_time_bucket[5m]))

# Average processing time
rate(starter_app_task_processing_time_sum[5m]) /
rate(starter_app_task_processing_time_count[5m])

4. Gauge (Observable)

What: Current value at a point in time.

When: Measuring instantaneous values from external sources.

Examples:

  • Current memory usage
  • Database connection pool size
  • Queue depth

API:

def get_memory_usage():
    """Callback that returns current memory usage."""
    import psutil
    return psutil.Process().memory_info().rss / 1024 / 1024  # MB

# Create observable gauge
meter.create_observable_gauge(
    name="starter_app.memory.usage",
    description="Current memory usage",
    unit="MB",
    callbacks=[get_memory_usage]
)

Note: Callback is invoked periodically by the SDK.

Comparison Table

Type Direction Use Case Example
Counter Up only Cumulative events Total requests
UpDownCounter Up/Down Current count Active connections
Histogram Observations Value distributions Request latency
Gauge Current value Instantaneous reading Memory usage

Implementation

Step 1: Get a Meter

Create a meter for your module:

# src/observability/metrics.py or in your module
from opentelemetry import metrics

# One meter per module/component
meter = metrics.get_meter(__name__)

Best Practice: Use __name__ to namespace your metrics by module.

Step 2: Create Metric Instruments

Define instruments at module level (not in functions):

# src/observability/metrics.py
from opentelemetry import metrics

meter = metrics.get_meter(__name__)

# Counters
tasks_created = meter.create_counter(
    name="starter_app.tasks.created",
    description="Total tasks created",
    unit="1"
)

tasks_completed = meter.create_counter(
    name="starter_app.tasks.completed",
    description="Total tasks completed",
    unit="1"
)

tasks_failed = meter.create_counter(
    name="starter_app.tasks.failed",
    description="Total task failures",
    unit="1"
)

# Histograms
task_processing_time = meter.create_histogram(
    name="starter_app.task.processing_time",
    description="Time to process tasks",
    unit="ms"
)

Why module level?

  • Instruments are lightweight
  • Create once, use many times
  • Better performance

Step 3: Record Measurements

Import and use instruments in your code:

# src/application/commands/create_task_command.py
import time
from observability.metrics import tasks_created, task_processing_time

class CreateTaskCommandHandler:
    async def handle_async(self, command: CreateTaskCommand):
        start_time = time.time()

        # Business logic
        task = await self.create_task(command)

        # Record metrics
        processing_time_ms = (time.time() - start_time) * 1000

        tasks_created.add(
            1,
            {
                "priority": task.priority.value,
                "status": task.status.value,
                "has_assignee": bool(task.assignee_id),
            }
        )

        task_processing_time.record(
            processing_time_ms,
            {"operation": "create", "priority": task.priority.value}
        )

        return task

Step 4: Add Attributes (Labels)

Attributes provide dimensions for filtering and grouping:

# Good: Low-cardinality attributes
tasks_created.add(1, {
    "priority": "high",        # 3 values: high, medium, low
    "status": "pending",       # 4 values: pending, in_progress, completed, failed
    "department": "engineering" # ~10 values
})

# Bad: High-cardinality attributes
tasks_created.add(1, {
    "task_id": "uuid-1234",    # ❌ Millions of unique values!
    "user_id": "user-5678",    # ❌ Thousands of unique values!
})

Why avoid high cardinality?

  • Explodes storage costs
  • Slows down queries
  • Metrics backends charge per unique series

Step 5: Configure Export

Metrics are automatically exported via OpenTelemetry SDK configured in main.py:

# src/main.py
from neuroglia.observability import Observability

def create_app() -> FastAPI:
    builder = WebApplicationBuilder(app_settings=app_settings)

    # Auto-configures metrics export
    Observability.configure(builder)

Configuration (via environment variables):

# Export to OTEL Collector
OTEL_METRICS_EXPORTER=otlp
OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317

# Or export to console (development)
OTEL_METRICS_EXPORTER=console

Best Practices

1. Naming Conventions

Follow OpenTelemetry semantic conventions:

Format: {namespace}.{component}.{metric_name}

# Good examples
"starter_app.tasks.created"           # namespace.component.metric
"starter_app.task.processing_time"
"starter_app.database.queries"
"starter_app.cache.hits"

# Bad examples
"TasksCreated"                        # No namespace, capitalized
"tasks"                               # Too vague
"starter_app_tasks_created_total"     # Wrong separator (use dots)

Units: Use standard units

# Time
unit="ms"        # milliseconds
unit="s"         # seconds

# Size
unit="By"        # bytes
unit="MB"        # megabytes

# Count
unit="1"         # dimensionless count
unit="{items}"   # count of items

# Percentage
unit="%"         # percentage (0-100)

2. Instrument Naming

Be descriptive and consistent:

# Counters: Use noun (what is being counted)
tasks_created = meter.create_counter("starter_app.tasks.created")
requests_failed = meter.create_counter("starter_app.requests.failed")

# Histograms: Use noun describing measured value
request_duration = meter.create_histogram("starter_app.request.duration")
payload_size = meter.create_histogram("starter_app.payload.size")

3. Attribute Design

Use categorical attributes:

# Good: Limited set of values
{
    "priority": "high",          # 3 values
    "status": "completed",       # 4 values
    "operation": "create",       # CRUD operations
    "error_type": "validation",  # ~10 error types
}

# Bad: Unique values
{
    "task_id": "uuid",           # Millions
    "user_email": "user@...",    # Thousands
    "timestamp": "2024-...",     # Infinite
}

Keep attribute count low:

# Good: 2-4 attributes
tasks_created.add(1, {"priority": "high", "status": "pending"})

# Bad: Too many attributes
tasks_created.add(1, {
    "priority": "high",
    "status": "pending",
    "department": "eng",
    "assignee": "john",
    "created_by": "admin",
    "project": "alpha",
    # ... 10 more attributes
})

Cardinality calculation:

Total series = attribute1_values × attribute2_values × ... × attributeN_values

Example:
priority (3) × status (4) × department (10) = 120 series ✅

vs.

priority (3) × user_id (10,000) = 30,000 series ❌

4. Recording Patterns

Pattern 1: Increment Counter

# Simple increment
tasks_created.add(1)

# With attributes
tasks_created.add(1, {"priority": "high"})

# Conditional increment
if task.priority == TaskPriority.HIGH:
    high_priority_tasks.add(1)

Pattern 2: Timing Operations

import time

start_time = time.time()

# Perform operation
result = await perform_operation()

# Record duration
duration_ms = (time.time() - start_time) * 1000
operation_duration.record(duration_ms, {"operation": "process"})

Pattern 3: Context Manager

import time
from contextlib import contextmanager

@contextmanager
def record_duration(histogram, attributes):
    start_time = time.time()
    try:
        yield
    finally:
        duration_ms = (time.time() - start_time) * 1000
        histogram.record(duration_ms, attributes)

# Usage
with record_duration(task_processing_time, {"operation": "create"}):
    await create_task(command)

Pattern 4: Decorator

def record_execution_time(operation: str):
    def decorator(func):
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = await func(*args, **kwargs)
                duration_ms = (time.time() - start_time) * 1000
                task_processing_time.record(
                    duration_ms,
                    {"operation": operation, "status": "success"}
                )
                return result
            except Exception as e:
                duration_ms = (time.time() - start_time) * 1000
                task_processing_time.record(
                    duration_ms,
                    {"operation": operation, "status": "error"}
                )
                raise
        return wrapper
    return decorator

# Usage
@record_execution_time("create_task")
async def create_task(command):
    ...

5. Error Handling

Always record success and failure:

try:
    result = await perform_operation()
    operations_successful.add(1, {"operation": "process"})
except ValidationError as e:
    operations_failed.add(1, {"operation": "process", "error": "validation"})
    raise
except Exception as e:
    operations_failed.add(1, {"operation": "process", "error": "unknown"})
    raise

6. Performance Considerations

Metrics are lightweight:

  • Recording metrics is fast (microseconds)
  • Aggregation happens asynchronously
  • Export is batched

But avoid:

  • Recording in tight loops (aggregate first)
  • Creating instruments dynamically
  • Excessive attribute counts
# Bad: Recording in loop
for item in items:
    items_processed.add(1)  # 1000 calls

# Good: Aggregate first
items_processed.add(len(items))  # 1 call

Examples

Example 1: Task Creation Metrics

Full instrumentation of task creation:

# src/observability/metrics.py
from opentelemetry import metrics

meter = metrics.get_meter(__name__)

tasks_created = meter.create_counter(
    name="starter_app.tasks.created",
    description="Total tasks created",
    unit="1"
)

task_processing_time = meter.create_histogram(
    name="starter_app.task.processing_time",
    description="Time to process task operations",
    unit="ms"
)
# src/application/commands/create_task_command.py
import time
from observability.metrics import tasks_created, task_processing_time
from opentelemetry import trace

tracer = trace.get_tracer(__name__)

class CreateTaskCommandHandler:
    async def handle_async(
        self,
        command: CreateTaskCommand
    ) -> OperationResult[Task]:
        start_time = time.time()

        try:
            # Create task entity
            with tracer.start_as_current_span("create_task_entity") as span:
                task = Task(
                    title=command.title,
                    description=command.description,
                    priority=TaskPriority(command.priority),
                    status=TaskStatus(command.status),
                    assignee_id=command.assignee_id,
                    department=command.department,
                )
                span.set_attribute("task.priority", task.priority.value)
                span.set_attribute("task.status", task.status.value)

            # Save to repository
            saved_task = await self.task_repository.add_async(task)

            # Record metrics
            processing_time_ms = (time.time() - start_time) * 1000

            tasks_created.add(
                1,
                {
                    "priority": saved_task.state.priority.value,
                    "status": saved_task.state.status.value,
                    "has_assignee": bool(saved_task.state.assignee_id),
                    "has_department": bool(saved_task.state.department),
                }
            )

            task_processing_time.record(
                processing_time_ms,
                {"operation": "create", "priority": saved_task.state.priority.value}
            )

            return OperationResult.success(saved_task)

        except Exception as e:
            processing_time_ms = (time.time() - start_time) * 1000
            task_processing_time.record(
                processing_time_ms,
                {"operation": "create", "status": "error"}
            )
            raise

Example 2: API Request Metrics

Track API endpoint usage:

# src/observability/metrics.py
api_requests = meter.create_counter(
    name="starter_app.api.requests",
    description="Total API requests",
    unit="1"
)

api_request_duration = meter.create_histogram(
    name="starter_app.api.request.duration",
    description="API request duration",
    unit="ms"
)
# src/api/middleware.py
from fastapi import Request
import time
from observability.metrics import api_requests, api_request_duration

async def metrics_middleware(request: Request, call_next):
    start_time = time.time()

    # Process request
    response = await call_next(request)

    # Record metrics
    duration_ms = (time.time() - start_time) * 1000

    api_requests.add(
        1,
        {
            "method": request.method,
            "endpoint": request.url.path,
            "status_code": response.status_code,
        }
    )

    api_request_duration.record(
        duration_ms,
        {
            "method": request.method,
            "endpoint": request.url.path,
        }
    )

    return response

Example 3: Database Query Metrics

Monitor database performance:

# src/observability/metrics.py
db_queries = meter.create_counter(
    name="starter_app.database.queries",
    description="Total database queries",
    unit="1"
)

db_query_duration = meter.create_histogram(
    name="starter_app.database.query.duration",
    description="Database query duration",
    unit="ms"
)
# src/integration/repositories/motor_task_repository.py
import time
from observability.metrics import db_queries, db_query_duration

class MongoTaskRepository(TaskRepository):
    async def get_by_id_async(self, id: str) -> Task | None:
        start_time = time.time()

        try:
            result = await self._collection.find_one({"_id": id})

            duration_ms = (time.time() - start_time) * 1000
            db_queries.add(1, {"operation": "find_one", "collection": "tasks"})
            db_query_duration.record(
                duration_ms,
                {"operation": "find_one"}
            )

            return self._deserialize(result) if result else None

        except Exception as e:
            duration_ms = (time.time() - start_time) * 1000
            db_queries.add(
                1,
                {"operation": "find_one", "collection": "tasks", "status": "error"}
            )
            db_query_duration.record(
                duration_ms,
                {"operation": "find_one", "status": "error"}
            )
            raise

Example 4: Business Metrics

Track business-critical metrics:

# src/observability/metrics.py
tasks_by_department = meter.create_counter(
    name="starter_app.tasks.by_department",
    description="Tasks created by department",
    unit="1"
)

high_priority_tasks = meter.create_up_down_counter(
    name="starter_app.tasks.high_priority.active",
    description="Active high-priority tasks",
    unit="1"
)

task_completion_rate = meter.create_histogram(
    name="starter_app.task.completion_time",
    description="Time from creation to completion",
    unit="hours"
)
# Usage in handlers
tasks_by_department.add(1, {"department": task.department})

# When task is created
if task.priority == TaskPriority.HIGH:
    high_priority_tasks.add(1)

# When task is completed
if task.priority == TaskPriority.HIGH:
    high_priority_tasks.add(-1)

# Record completion time
completion_hours = (task.completed_at - task.created_at).total_seconds() / 3600
task_completion_rate.record(completion_hours, {"priority": task.priority.value})

Troubleshooting

Metrics Not Appearing

1. Check exporter configuration:

# Should be 'otlp' or 'console'
echo $OTEL_METRICS_EXPORTER

# Should be set
echo $OTEL_EXPORTER_OTLP_ENDPOINT

2. Check collector is receiving metrics:

# View collector logs
docker-compose logs otel-collector | grep metrics

# Should see: "Metric {...}"

3. Add debug logging:

import logging
logging.basicConfig(level=logging.DEBUG)

tasks_created.add(1, {"priority": "high"})
print("Metric recorded")

High Cardinality Issues

Symptoms:

  • Slow queries
  • High memory usage
  • Storage costs increasing

Solution:

# Before: High cardinality
tasks_created.add(1, {
    "task_id": task.id,        # ❌ Millions of values
    "user_email": user.email,  # ❌ Thousands of values
})

# After: Low cardinality
tasks_created.add(1, {
    "priority": task.priority.value,  # ✅ 3 values
    "department": task.department,     # ✅ ~10 values
})

Metrics Not Aggregating

Problem: Seeing raw observations instead of aggregated metrics.

Cause: Using histogram for counting or counter for distributions.

Solution:

# Wrong: Using histogram for counting
task_count.record(1)  # ❌

# Right: Use counter for counting
tasks_created.add(1)  # ✅

# Wrong: Using counter for latency
request_time.add(123)  # ❌

# Right: Use histogram for latency
request_duration.record(123)  # ✅

Additional Resources