You've learned 29 days of MCP. Today, combine everything into a single production-grade enterprise platform: multi-server architecture, API key + OAuth auth, Redis rate limiting, AWS service integrations, Bedrock AI routing, SSE streaming, context management, monitoring, and IaC deployment.
The enterprise MCP platform is a fleet of specialised servers behind a secure, observable gateway. Each server is responsible for one domain. The router aggregates them and presents a unified tool surface to Claude and custom MCP clients.
| Component | Technology | Purpose |
|---|---|---|
| API Gateway | AWS API GW + WAF | TLS termination, DDoS protection, throttling |
| Auth layer | API key + Cognito JWT | Authenticate every tool call |
| Rate limiter | ElastiCache Redis | Sliding-window 1,000 req/min per client |
| Router | FastMCP + ECS Fargate | Aggregate servers, namespace tools |
| Servers | FastMCP (Python) | Domain-specific tools (4 servers) |
| AI server | Bedrock Converse API | Specialists, KB search, caching |
| Observability | CloudWatch + X-Ray | Latency, errors, tool usage metrics |
| Deployment | AWS CDK (Python) | Infra-as-code; single cdk deploy |
Every server loads its configuration from AWS Secrets Manager at startup. A shared platform_config.py module resolves secrets and exposes typed settings — no environment variable spaghetti, no hardcoded credentials.
# platform_config.py — shared across all servers import boto3, json, time from dataclasses import dataclass from functools import lru_cache @dataclass class PlatformConfig: aws_region: str bedrock_model: str kb_id: str dynamo_table: str redis_host: str allowed_origins: list[str] rate_limit_rpm: int log_level: str @lru_cache(maxsize=1) def load_config() -> PlatformConfig: sm = boto3.client("secretsmanager") raw = sm.get_secret_value(SecretId="mcp-platform/config") cfg = json.loads(raw["SecretString"]) return PlatformConfig(**cfg) # Each server imports this at top: # from platform_config import load_config # cfg = load_config()
# router.py — the MCP router (aggregates all servers) from fastmcp import FastMCP, Client from fastmcp.client.transports import SSETransport from platform_config import load_config import asyncio cfg = load_config() router = FastMCP("enterprise-router") SERVERS = { "data": SSETransport("http://data-server:8080/sse"), "ai": SSETransport("http://ai-server:8081/sse"), "ops": SSETransport("http://ops-server:8082/sse"), "secrets": SSETransport("http://secrets-server:8083/sse"), } async def register_all(): for ns, transport in SERVERS.items(): async with Client(transport) as client: tools = await client.list_tools() for tool in tools: def make_proxy(t=tool, tr=transport): async def proxy(**kwargs): async with Client(tr) as c: return await c.call_tool(t.name, kwargs) proxy.__name__ = f"{ns}__{t.name}" proxy.__doc__ = t.description return proxy router.add_tool(make_proxy(), name=f"{ns}__{tool.name}") if __name__ == "__main__": asyncio.run(register_all()) router.run(transport="sse", host="0.0.0.0", port=8080)
All requests pass through an auth + rate limiting middleware before reaching any MCP tool. This is a FastAPI gateway that wraps the MCP router — validating API keys, checking Redis for rate limits, and forwarding clean requests.
# gateway.py — FastAPI wrapper around the MCP router from fastapi import FastAPI, Request, HTTPException from fastapi.responses import StreamingResponse import redis.asyncio as aioredis import httpx, time, hmac, hashlib, json from platform_config import load_config cfg = load_config() app = FastAPI() redis = aioredis.from_url(f"redis://{cfg.redis_host}") VALID_KEYS = set() # loaded from Secrets Manager on startup async def check_auth(request: Request) -> str: key = request.headers.get("X-API-Key", "") hashed = hashlib.sha256(key.encode()).hexdigest() if hashed not in VALID_KEYS: raise HTTPException(401, "Invalid API key") return hashed # return key hash as client ID async def check_rate_limit(client_id: str): key = f"rl:{client_id}" now_ms = int(time.time() * 1000) window_ms = 60_000 limit = cfg.rate_limit_rpm lua = """ local key,now,window,limit = KEYS[1],tonumber(ARGV[1]),tonumber(ARGV[2]),tonumber(ARGV[3]) redis.call('ZREMRANGEBYSCORE',key,0,now-window) local count = redis.call('ZCARD',key) if count >= limit then return 0 end redis.call('ZADD',key,now,now) redis.call('PEXPIRE',key,window) return 1 """ allowed = await redis.eval(lua, 1, key, now_ms, window_ms, limit) if not allowed: raise HTTPException(429, "Rate limit exceeded") @app.post("/mcp/{path:path}") async def proxy(path: str, request: Request): client_id = await check_auth(request) await check_rate_limit(client_id) body = await request.body() # Forward to internal MCP router async with httpx.AsyncClient() as client: resp = await client.post( f"http://router:8080/{path}", content=body, headers={"Content-Type": "application/json"} ) return StreamingResponse(iter([resp.content]), media_type=resp.headers["content-type"])
The data server bundles all AWS service tools into a single deployable unit. Each tool enforces least-privilege IAM and uses IAM role credentials from the ECS task definition — zero hardcoded keys.
# data_server.py — S3 + DynamoDB + Bedrock KB from fastmcp import FastMCP import boto3, json from platform_config import load_config from pydantic import BaseModel, constr from typing import Optional cfg = load_config() mcp = FastMCP("data-server") s3 = boto3.client("s3") ddb = boto3.resource("dynamodb") br = boto3.client("bedrock-agent-runtime") table = ddb.Table(cfg.dynamo_table) @mcp.tool() async def s3_get(bucket: str, key: str) -> str: """Read an S3 object (max 50KB returned).""" obj = s3.get_object(Bucket=bucket, Key=key) body = obj["Body"].read(51200) try: return body.decode() except: return f"[Binary: {len(body)} bytes]" @mcp.tool() async def dynamo_get(pk: str, sk: str) -> str: """Retrieve a DynamoDB item by primary key.""" resp = table.get_item(Key={"pk": pk, "sk": sk}) item = resp.get("Item") return json.dumps(item, default=str) if item else "Not found." @mcp.tool() async def kb_search(query: str, top_k: int = 5) -> str: """Semantic search over the Bedrock Knowledge Base.""" resp = br.retrieve( knowledgeBaseId=cfg.kb_id, retrievalQuery={"text": query}, retrievalConfiguration={"vectorSearchConfiguration": {"numberOfResults": top_k}} ) out = [] for r in resp["retrievalResults"]: out.append(f"[{round(r['score'],3)}] {r['content']['text'][:400]}") return "\n\n".join(out) if __name__ == "__main__": mcp.run(transport="sse", host="0.0.0.0", port=8080)
Production MCP platforms need three observability pillars: structured logs for debugging, custom CloudWatch metrics for dashboards and alarms, and X-Ray traces for request flow across services.
# observability.py — shared middleware for all servers import boto3, json, time, logging, uuid from functools import wraps cw = boto3.client("cloudwatch") logger = logging.getLogger("mcp-platform") logging.basicConfig(format='%(message)s', level=logging.INFO) def instrument(server_name: str): """Decorator: logs every tool call with latency + emits CloudWatch metrics.""" def decorator(fn): @wraps(fn) async def wrapper(*args, **kwargs): rid = str(uuid.uuid4())[:8] t0 = time.time() status = "success" try: result = await fn(*args, **kwargs) return result except Exception as e: status = "error" logger.error(json.dumps({ "rid": rid, "server": server_name, "tool": fn.__name__, "error": str(e) })) raise finally: ms = (time.time() - t0) * 1000 logger.info(json.dumps({ "rid": rid, "server": server_name, "tool": fn.__name__, "latency_ms": round(ms, 2), "status": status })) # Emit custom metric cw.put_metric_data( Namespace="MCPPlatform", MetricData=[ {"MetricName":"ToolLatency","Value":ms,"Unit":"Milliseconds", "Dimensions":[{"Name":"Server","Value":server_name},{"Name":"Tool","Value":fn.__name__}]}, {"MetricName":"ToolInvocations","Value":1,"Unit":"Count", "Dimensions":[{"Name":"Status","Value":status}]}, ] ) return wrapper return decorator # Usage: @instrument("data-server") on every @mcp.tool()
ToolLatency P99, ToolInvocations/min, ErrorRate, RateLimitHits, CacheHitRate. Set CloudWatch alarms on P99 > 3s and ErrorRate > 1%.
Add aws_xray_sdk to ECS tasks. Traces flow: API GW → Gateway → Router → Server → AWS Service. Visualise latency in the X-Ray Service Map.
The entire platform deploys with AWS CDK. One stack creates the VPC, ECS cluster, four Fargate services, ElastiCache Redis, API Gateway, and all IAM roles. One command deploys everything: cdk deploy MCPPlatformStack.
# cdk_stack.py — abbreviated key resources from aws_cdk import ( Stack, aws_ecs as ecs, aws_ec2 as ec2, aws_elasticache as cache, aws_iam as iam, aws_apigateway as apigw, aws_logs as logs ) from constructs import Construct class MCPPlatformStack(Stack): def __init__(self, scope, id, **kwargs): super().__init__(scope, id, **kwargs) # VPC vpc = ec2.Vpc(self, "PlatformVPC", max_azs=2) # ECS Cluster cluster = ecs.Cluster(self, "MCPCluster", vpc=vpc, container_insights=True) # Task IAM role with least-privilege policies task_role = iam.Role(self, "MCPTaskRole", assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com")) task_role.add_managed_policy(iam.ManagedPolicy.from_aws_managed_policy_name( "AmazonBedrockFullAccess")) task_role.add_to_policy(iam.PolicyStatement( actions=["s3:GetObject","dynamodb:GetItem","dynamodb:PutItem", "secretsmanager:GetSecretValue","cloudwatch:PutMetricData"], resources=["*"] )) # One Fargate service per server (router + 4 domain servers) for name, port in [("router",8080),("data",8080),("ai",8081), ("ops",8082),("secrets",8083)]: td = ecs.FargateTaskDefinition(self, f"TD-{name}", task_role=task_role, cpu=256, memory_limit_mib=512) td.add_container(f"mcp-{name}", image=ecs.ContainerImage.from_registry(f"123456789.dkr.ecr.us-east-1.amazonaws.com/mcp-{name}:latest"), port_mappings=[ecs.PortMapping(container_port=port)], logging=ecs.LogDrivers.aws_logs( stream_prefix=f"mcp-{name}", log_group=logs.LogGroup(self, f"LG-{name}", retention=logs.RetentionDays.ONE_WEEK) ) ) ecs.FargateService(self, f"SVC-{name}", cluster=cluster, task_definition=td, desired_count=2) # 2 replicas per service
cdk bootstrap → docker build & push × 5 images → cdk deploy MCPPlatformStack. The stack outputs the API Gateway URL — paste it into your Claude Code mcp_servers.json.
You've completed the full MCP curriculum. Here's every skill you've built — each one a building block of the enterprise platform you just deployed.
| Phase | Days | Skills Mastered |
|---|---|---|
| 🌱 Spark | 1–10 | MCP fundamentals, FastMCP setup, tools/resources/prompts, stdio transport, Claude Code integration, file tools, web tools, database tools, async patterns |
| 🔥 Forge | 11–20 | Testing/debugging, performance optimisation, error handling, production packaging, monitoring, Docker, composing servers, protocol deep-dive, versioning, SSE streaming |
| ⚡ Ascend | 21–30 | AWS integrations (S3/DDB/Lambda), Bedrock AI server, auth (API key + OAuth + RBAC), rate limiting (Redis), security & input validation, multi-server orchestration, MCP clients, context engineering, capstone platform |
desired_count=2 for each Fargate service. What does this achieve?@lru_cache(maxsize=1) on load_config()?lru_cache(maxsize=1) memoises the result — the first call fetches from Secrets Manager, every subsequent call returns the cached PlatformConfig object instantly.You've completed 30 Days of Mastering MCP — from running your first tool to deploying a production enterprise platform on AWS. You're now ready to build, secure, and scale agentic AI systems that power real-world workflows.