Mastering MCP · Day 25 of 30

Monitoring, Logging & Observability for MCP

Instrument your MCP servers with structured logging, correlation IDs, CloudWatch Embedded Metrics, X-Ray distributed tracing, operational dashboards, and automated alarms — so you know what's happening before your users do.

📅 Day 25
⏳ ~34 min read
🎯 Level ASCEND
🚀 Phase Observability
Table of Contents

The Three Pillars of Observability for AI Tool Servers

Observability is not the same as monitoring. Monitoring tells you when something is broken. Observability lets you understand why it broke — and predict breakage before it happens. For MCP servers, observability is especially critical because a silent failure in a tool call can cause an AI agent to hallucinate or loop indefinitely without any visible error.

📝
Logs
Structured JSON log lines with correlation IDs. Answer "what happened?" — reconstruct the exact sequence of events for any session or tool call. Stored in CloudWatch Logs, queried with Logs Insights.
What happened
📈
Metrics
Numeric time-series data: tool call rate, error rate, p99 latency. Answer "how healthy is the system?" Drives dashboards and alarms. Emitted via CloudWatch EMF for zero-cost custom metrics.
How healthy
🔍
Traces
End-to-end request flow across services. Answer "where is the time going?" AWS X-Ray and OpenTelemetry show latency at each hop: client → MCP server → tool → database → response.
Where is time going

For MCP servers, the three pillars have unique characteristics. A single agent interaction may spawn dozens of sequential tool calls across multiple MCP servers — the trace graph can be 10–20 hops deep. Without distributed tracing, debugging a slow agent response means manually correlating log lines from multiple servers. A correlation ID threaded through every log line, metric, and trace span is the connective tissue that makes the system debuggable.

ℹ️
MCP-specific failure modes to observe: Beyond the standard web server metrics, watch for tool call retry storms (clients retrying failed tools in tight loops), session map growth (memory leak indicator), and token budget exhaustion (when tool responses are truncated due to context limits). These are MCP-specific anti-patterns that generic monitoring won't surface.

Structured Logging with Correlation IDs

Every log line your MCP server emits should be a JSON object. Structured logs can be queried, aggregated, and correlated programmatically — plain text log lines cannot. The correlation ID is the single most important field: it binds every log line from a single MCP session together, allowing you to reconstruct the full history of any interaction.

Python — structured logging middleware with correlation IDsimport structlog, uuid, time, contextvars
from fastmcp import FastMCP
from starlette.middleware.base import BaseHTTPMiddleware

# Context var: thread-safe, flows through async code automatically
correlation_id: contextvars.ContextVar[str] = contextvars.ContextVar("correlation_id", default="")

class CorrelationMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        # Propagate from upstream or generate new ID
        cid = (
            request.headers.get("x-correlation-id") or
            request.headers.get("x-amzn-trace-id") or
            str(uuid.uuid4())
        )
        correlation_id.set(cid)
        structlog.contextvars.bind_contextvars(
            correlation_id=cid,
            session_id=request.headers.get("mcp-session-id", "none"),
        )
        response = await call_next(request)
        response.headers["x-correlation-id"] = cid
        return response

# Usage — every log line automatically includes correlation_id + session_id
log = structlog.get_logger()

async def search_documents(query: str, limit: int = 10):
    start = time.monotonic()
    log.info("tool.start", tool="search_documents", query=query[:100], limit=limit)
    try:
        results = await _do_search(query, limit)
        duration_ms = (time.monotonic() - start) * 1000
        log.info("tool.success", tool="search_documents",
               duration_ms=round(duration_ms, 2), result_count=len(results))
        return results
    except Exception as e:
        log.error("tool.error", tool="search_documents", error=str(e), exc_info=True)
        raise

Each log line CloudWatch receives looks like this:

JSON — example structured log line in CloudWatch{
  "timestamp": "2025-05-18T14:32:01.234Z",
  "level": "info",
  "event": "tool.success",
  "tool": "search_documents",
  "correlation_id": "a3f8b2c1-4d5e-6f7a-8b9c-0d1e2f3a4b5c",
  "session_id": "sess_01HX...",
  "duration_ms": 47.3,
  "result_count": 8,
  "logger": "src.tools.search"
}
💡
CloudWatch Logs Insights query: fields @timestamp, tool, duration_ms, correlation_id | filter ispresent(tool) and event = "tool.success" | stats avg(duration_ms), pct(duration_ms, 99) by tool | sort pct_duration_ms_99 desc — instantly shows which tools have the worst p99 latency.

Metrics to Track for Production MCP Servers

Not all metrics are equal. Focus on the metrics that directly correlate with user experience and system health. For MCP servers, five metric families matter most.

MetricUnitThreshold AlertWhy It Matters
Tool call ratecalls/sec>80% capacityPrimary load indicator; spike = agent loop or external traffic surge
Tool error rate%>1% (warn) / >5% (critical)Direct signal of tool failures; sustained >5% means agent workflows are broken
p50 tool latencymsSLA dependentMedian experience; regressions here affect all users
p95 tool latencyms>500 msTail latency experienced by 5% of calls; agent loops amplify this
p99 tool latencyms>2000 msWorst-case experience; page if sustained for >5 min
Active SSE sessionscountNear max task capacitySession leak detection; should correlate with active users
Token usage per tooltokensApproaching context limitCost control + context window budget awareness

CloudWatch Embedded Metrics Format (EMF)

CloudWatch EMF lets you emit custom metrics as structured log lines — no put_metric_data API calls, no extra cost for PutMetricData, no batch throttling. You simply write a specially formatted JSON object to stdout and the CloudWatch Logs agent automatically extracts metrics from it.

The EMF format embeds metric definitions inside a _aws key within your log JSON. CloudWatch Logs ingests the line as a normal log entry AND simultaneously creates metric data points from the embedded metric definitions. This means your structured logs and your metrics come from the same source — no synchronization lag between what logs say happened and what metrics report.

Python — EMF metric emission for MCP toolsimport json, time, sys
from dataclasses import dataclass, field
from typing import Any

class EMFLogger:
    """Emit CloudWatch Embedded Metrics Format log lines."""

    def __init__(self, namespace: str, service: str):
        self.namespace = namespace
        self.service = service

    def emit(self, metrics: dict[str, float], dimensions: dict[str, str],
              properties: dict[str, Any] | None = None):
        metric_definitions = [
            {"Name": k, "Unit": "Count" if k.endswith("_count") else "Milliseconds"}
            for k in metrics
        ]
        log_entry = {
            "_aws": {
                "Timestamp": int(time.time() * 1000),
                "CloudWatchMetrics": [{
                    "Namespace": self.namespace,
                    "Dimensions": [list(dimensions.keys())],
                    "Metrics": metric_definitions,
                }]
            },
            "Service": self.service,
            **dimensions,
            **metrics,
            **(properties or {}),
        }
        print(json.dumps(log_entry), file=sys.stdout, flush=True)

# ── Usage in a tool handler ────────────────────────────────────────────
emf = EMFLogger(namespace="MCP/ToolServer", service="documents-server")

async def search_documents(query: str) -> list:
    start = time.monotonic()
    error_count = 0
    try:
        results = await _do_search(query)
        return results
    except Exception:
        error_count = 1
        raise
    finally:
        duration_ms = (time.monotonic() - start) * 1000
        emf.emit(
            metrics={
                "ToolDuration": duration_ms,
                "tool_error_count": error_count,
                "tool_call_count": 1,
            },
            dimensions={"ToolName": "search_documents", "Environment": "production"},
            properties={"query_length": len(query)},
        )

Distributed Tracing: X-Ray + OpenTelemetry for MCP

When Claude calls three MCP servers in sequence to complete a task, and the task is slow, you need to know which server and which tool is the bottleneck. Distributed tracing provides a flame graph view of the entire call chain — from the initial MCP request through tool execution, downstream API calls, and database queries.

Python — OpenTelemetry + X-Ray tracing for MCP toolsfrom opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
from opentelemetry.propagators.aws import AwsXRayPropagator
import opentelemetry.propagate as propagate

# ── Configure OTel → X-Ray ──────────────────────────────────────────
def setup_tracing(service_name: str):
    provider = TracerProvider()
    # Export to AWS Distro for OpenTelemetry (ADOT) collector sidecar
    exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
    provider.add_span_processor(BatchSpanProcessor(exporter))
    trace.set_tracer_provider(provider)
    # Use X-Ray trace ID format for correlation with CloudWatch Logs
    propagate.set_global_textmap(AwsXRayPropagator())
    # Auto-instrument HTTP client library
    AioHttpClientInstrumentor().instrument()

tracer = trace.get_tracer("mcp-server")

# ── Decorator: trace any MCP tool ──────────────────────────────────
def traced(tool_name: str):
    def decorator(fn):
        async def wrapper(**kwargs):
            with tracer.start_as_current_span(f"mcp.tool.{tool_name}") as span:
                span.set_attribute("mcp.tool.name", tool_name)
                span.set_attribute("mcp.tool.input_keys", str(list(kwargs.keys())))
                try:
                    result = await fn(**kwargs)
                    span.set_status(trace.StatusCode.OK)
                    return result
                except Exception as e:
                    span.record_exception(e)
                    span.set_status(trace.StatusCode.ERROR, str(e))
                    raise
        return wrapper
    return decorator
💡
ADOT sidecar in ECS: Add the public.ecr.aws/aws-observability/aws-otel-collector:latest container as a sidecar in your ECS task definition. Configure it with an otel-config.yaml that receives OTLP on port 4317 and exports to AWS X-Ray. This requires no X-Ray SDK in your application — just standard OpenTelemetry APIs.

CloudWatch Dashboard, Alarms & SNS Notifications

Metrics are only useful if someone sees them. A well-designed CloudWatch dashboard gives your team an at-a-glance health view of all MCP servers. Alarms with SNS notifications ensure someone is paged when thresholds are breached — before users start filing bug reports.

Python (boto3) — create MCP health dashboard + alarmsimport boto3, json

cw = boto3.client("cloudwatch", region_name="us-east-1")
sns = boto3.client("sns", region_name="us-east-1")

# ── SNS topic for MCP alerts ──────────────────────────────────────
topic = sns.create_topic(Name="mcp-alerts")
topic_arn = topic["TopicArn"]

# ── Alarm: tool error rate > 5% ───────────────────────────────────
cw.put_metric_alarm(
    AlarmName="MCP-HighToolErrorRate",
    AlarmDescription="MCP tool error rate exceeds 5% for 5 minutes",
    Namespace="MCP/ToolServer",
    MetricName="tool_error_count",
    Dimensions=[{"Name": "Environment", "Value": "production"}],
    Period=300,
    EvaluationPeriods=1,
    Statistic="Sum",
    Threshold=50.0,
    ComparisonOperator="GreaterThanThreshold",
    TreatMissingData="notBreaching",
    AlarmActions=[topic_arn],
    OKActions=[topic_arn],
)

# ── Alarm: p99 latency > 2s ───────────────────────────────────────
cw.put_metric_alarm(
    AlarmName="MCP-HighP99Latency",
    AlarmDescription="MCP p99 tool latency exceeds 2000ms",
    Namespace="MCP/ToolServer",
    MetricName="ToolDuration",
    Dimensions=[{"Name": "Environment", "Value": "production"}],
    Period=300,
    EvaluationPeriods=2,
    ExtendedStatistic="p99",
    Threshold=2000.0,
    ComparisonOperator="GreaterThanThreshold",
    AlarmActions=[topic_arn],
)
⚠️
Composite alarms reduce noise: Use put_composite_alarm to combine the error rate alarm AND the latency alarm with an AND rule. Page only when both are breached simultaneously — this eliminates false positives from brief latency spikes that don't impact error rates, reducing alert fatigue significantly.
Knowledge Check
4 questions · instant feedback · Observability checkpoint
1. What is the primary advantage of CloudWatch Embedded Metrics Format (EMF) over calling the PutMetricData API directly?
2. Why is a correlation ID essential for debugging MCP server issues specifically (beyond general web services)?
3. What does the AWS Distro for OpenTelemetry (ADOT) collector sidecar do in an ECS MCP deployment?
4. Why should you use a composite alarm (combining error rate AND latency) instead of two separate alarms for MCP health monitoring?
out of 4 correct —