Instrument your MCP servers with structured logging, correlation IDs, CloudWatch Embedded Metrics, X-Ray distributed tracing, operational dashboards, and automated alarms — so you know what's happening before your users do.
Observability is not the same as monitoring. Monitoring tells you when something is broken. Observability lets you understand why it broke — and predict breakage before it happens. For MCP servers, observability is especially critical because a silent failure in a tool call can cause an AI agent to hallucinate or loop indefinitely without any visible error.
For MCP servers, the three pillars have unique characteristics. A single agent interaction may spawn dozens of sequential tool calls across multiple MCP servers — the trace graph can be 10–20 hops deep. Without distributed tracing, debugging a slow agent response means manually correlating log lines from multiple servers. A correlation ID threaded through every log line, metric, and trace span is the connective tissue that makes the system debuggable.
Every log line your MCP server emits should be a JSON object. Structured logs can be queried, aggregated, and correlated programmatically — plain text log lines cannot. The correlation ID is the single most important field: it binds every log line from a single MCP session together, allowing you to reconstruct the full history of any interaction.
Python — structured logging middleware with correlation IDsimport structlog, uuid, time, contextvars from fastmcp import FastMCP from starlette.middleware.base import BaseHTTPMiddleware # Context var: thread-safe, flows through async code automatically correlation_id: contextvars.ContextVar[str] = contextvars.ContextVar("correlation_id", default="") class CorrelationMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): # Propagate from upstream or generate new ID cid = ( request.headers.get("x-correlation-id") or request.headers.get("x-amzn-trace-id") or str(uuid.uuid4()) ) correlation_id.set(cid) structlog.contextvars.bind_contextvars( correlation_id=cid, session_id=request.headers.get("mcp-session-id", "none"), ) response = await call_next(request) response.headers["x-correlation-id"] = cid return response # Usage — every log line automatically includes correlation_id + session_id log = structlog.get_logger() async def search_documents(query: str, limit: int = 10): start = time.monotonic() log.info("tool.start", tool="search_documents", query=query[:100], limit=limit) try: results = await _do_search(query, limit) duration_ms = (time.monotonic() - start) * 1000 log.info("tool.success", tool="search_documents", duration_ms=round(duration_ms, 2), result_count=len(results)) return results except Exception as e: log.error("tool.error", tool="search_documents", error=str(e), exc_info=True) raise
Each log line CloudWatch receives looks like this:
JSON — example structured log line in CloudWatch{ "timestamp": "2025-05-18T14:32:01.234Z", "level": "info", "event": "tool.success", "tool": "search_documents", "correlation_id": "a3f8b2c1-4d5e-6f7a-8b9c-0d1e2f3a4b5c", "session_id": "sess_01HX...", "duration_ms": 47.3, "result_count": 8, "logger": "src.tools.search" }
fields @timestamp, tool, duration_ms, correlation_id | filter ispresent(tool) and event = "tool.success" | stats avg(duration_ms), pct(duration_ms, 99) by tool | sort pct_duration_ms_99 desc — instantly shows which tools have the worst p99 latency.Not all metrics are equal. Focus on the metrics that directly correlate with user experience and system health. For MCP servers, five metric families matter most.
| Metric | Unit | Threshold Alert | Why It Matters |
|---|---|---|---|
| Tool call rate | calls/sec | >80% capacity | Primary load indicator; spike = agent loop or external traffic surge |
| Tool error rate | % | >1% (warn) / >5% (critical) | Direct signal of tool failures; sustained >5% means agent workflows are broken |
| p50 tool latency | ms | SLA dependent | Median experience; regressions here affect all users |
| p95 tool latency | ms | >500 ms | Tail latency experienced by 5% of calls; agent loops amplify this |
| p99 tool latency | ms | >2000 ms | Worst-case experience; page if sustained for >5 min |
| Active SSE sessions | count | Near max task capacity | Session leak detection; should correlate with active users |
| Token usage per tool | tokens | Approaching context limit | Cost control + context window budget awareness |
CloudWatch EMF lets you emit custom metrics as structured log lines — no put_metric_data API calls, no extra cost for PutMetricData, no batch throttling. You simply write a specially formatted JSON object to stdout and the CloudWatch Logs agent automatically extracts metrics from it.
The EMF format embeds metric definitions inside a _aws key within your log JSON. CloudWatch Logs ingests the line as a normal log entry AND simultaneously creates metric data points from the embedded metric definitions. This means your structured logs and your metrics come from the same source — no synchronization lag between what logs say happened and what metrics report.
Python — EMF metric emission for MCP toolsimport json, time, sys from dataclasses import dataclass, field from typing import Any class EMFLogger: """Emit CloudWatch Embedded Metrics Format log lines.""" def __init__(self, namespace: str, service: str): self.namespace = namespace self.service = service def emit(self, metrics: dict[str, float], dimensions: dict[str, str], properties: dict[str, Any] | None = None): metric_definitions = [ {"Name": k, "Unit": "Count" if k.endswith("_count") else "Milliseconds"} for k in metrics ] log_entry = { "_aws": { "Timestamp": int(time.time() * 1000), "CloudWatchMetrics": [{ "Namespace": self.namespace, "Dimensions": [list(dimensions.keys())], "Metrics": metric_definitions, }] }, "Service": self.service, **dimensions, **metrics, **(properties or {}), } print(json.dumps(log_entry), file=sys.stdout, flush=True) # ── Usage in a tool handler ──────────────────────────────────────────── emf = EMFLogger(namespace="MCP/ToolServer", service="documents-server") async def search_documents(query: str) -> list: start = time.monotonic() error_count = 0 try: results = await _do_search(query) return results except Exception: error_count = 1 raise finally: duration_ms = (time.monotonic() - start) * 1000 emf.emit( metrics={ "ToolDuration": duration_ms, "tool_error_count": error_count, "tool_call_count": 1, }, dimensions={"ToolName": "search_documents", "Environment": "production"}, properties={"query_length": len(query)}, )
When Claude calls three MCP servers in sequence to complete a task, and the task is slow, you need to know which server and which tool is the bottleneck. Distributed tracing provides a flame graph view of the entire call chain — from the initial MCP request through tool execution, downstream API calls, and database queries.
Python — OpenTelemetry + X-Ray tracing for MCP toolsfrom opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor from opentelemetry.propagators.aws import AwsXRayPropagator import opentelemetry.propagate as propagate # ── Configure OTel → X-Ray ────────────────────────────────────────── def setup_tracing(service_name: str): provider = TracerProvider() # Export to AWS Distro for OpenTelemetry (ADOT) collector sidecar exporter = OTLPSpanExporter(endpoint="http://localhost:4317") provider.add_span_processor(BatchSpanProcessor(exporter)) trace.set_tracer_provider(provider) # Use X-Ray trace ID format for correlation with CloudWatch Logs propagate.set_global_textmap(AwsXRayPropagator()) # Auto-instrument HTTP client library AioHttpClientInstrumentor().instrument() tracer = trace.get_tracer("mcp-server") # ── Decorator: trace any MCP tool ────────────────────────────────── def traced(tool_name: str): def decorator(fn): async def wrapper(**kwargs): with tracer.start_as_current_span(f"mcp.tool.{tool_name}") as span: span.set_attribute("mcp.tool.name", tool_name) span.set_attribute("mcp.tool.input_keys", str(list(kwargs.keys()))) try: result = await fn(**kwargs) span.set_status(trace.StatusCode.OK) return result except Exception as e: span.record_exception(e) span.set_status(trace.StatusCode.ERROR, str(e)) raise return wrapper return decorator
public.ecr.aws/aws-observability/aws-otel-collector:latest container as a sidecar in your ECS task definition. Configure it with an otel-config.yaml that receives OTLP on port 4317 and exports to AWS X-Ray. This requires no X-Ray SDK in your application — just standard OpenTelemetry APIs.Metrics are only useful if someone sees them. A well-designed CloudWatch dashboard gives your team an at-a-glance health view of all MCP servers. Alarms with SNS notifications ensure someone is paged when thresholds are breached — before users start filing bug reports.
Python (boto3) — create MCP health dashboard + alarmsimport boto3, json cw = boto3.client("cloudwatch", region_name="us-east-1") sns = boto3.client("sns", region_name="us-east-1") # ── SNS topic for MCP alerts ────────────────────────────────────── topic = sns.create_topic(Name="mcp-alerts") topic_arn = topic["TopicArn"] # ── Alarm: tool error rate > 5% ─────────────────────────────────── cw.put_metric_alarm( AlarmName="MCP-HighToolErrorRate", AlarmDescription="MCP tool error rate exceeds 5% for 5 minutes", Namespace="MCP/ToolServer", MetricName="tool_error_count", Dimensions=[{"Name": "Environment", "Value": "production"}], Period=300, EvaluationPeriods=1, Statistic="Sum", Threshold=50.0, ComparisonOperator="GreaterThanThreshold", TreatMissingData="notBreaching", AlarmActions=[topic_arn], OKActions=[topic_arn], ) # ── Alarm: p99 latency > 2s ─────────────────────────────────────── cw.put_metric_alarm( AlarmName="MCP-HighP99Latency", AlarmDescription="MCP p99 tool latency exceeds 2000ms", Namespace="MCP/ToolServer", MetricName="ToolDuration", Dimensions=[{"Name": "Environment", "Value": "production"}], Period=300, EvaluationPeriods=2, ExtendedStatistic="p99", Threshold=2000.0, ComparisonOperator="GreaterThanThreshold", AlarmActions=[topic_arn], )
put_composite_alarm to combine the error rate alarm AND the latency alarm with an AND rule. Page only when both are breached simultaneously — this eliminates false positives from brief latency spikes that don't impact error rates, reducing alert fatigue significantly.